mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-04-02 02:35:31 +00:00
ws: use idiomatic folder structure
This commit is contained in:
parent
aa3253fcd6
commit
d20e57d861
17 changed files with 0 additions and 2 deletions
189
aquatic_ws/src/common/handlers.rs
Normal file
189
aquatic_ws/src/common/handlers.rs
Normal file
|
|
@ -0,0 +1,189 @@
|
|||
use aquatic_common::extract_response_peers;
|
||||
use hashbrown::HashMap;
|
||||
use rand::Rng;
|
||||
|
||||
use aquatic_ws_protocol::*;
|
||||
|
||||
use crate::common::*;
|
||||
use crate::config::Config;
|
||||
|
||||
pub fn handle_announce_request(
|
||||
config: &Config,
|
||||
rng: &mut impl Rng,
|
||||
torrent_maps: &mut TorrentMaps,
|
||||
out_messages: &mut Vec<(ConnectionMeta, OutMessage)>,
|
||||
valid_until: ValidUntil,
|
||||
request_sender_meta: ConnectionMeta,
|
||||
request: AnnounceRequest,
|
||||
) {
|
||||
let torrent_data: &mut TorrentData = if request_sender_meta.converted_peer_ip.is_ipv4() {
|
||||
torrent_maps.ipv4.entry(request.info_hash).or_default()
|
||||
} else {
|
||||
torrent_maps.ipv6.entry(request.info_hash).or_default()
|
||||
};
|
||||
|
||||
// If there is already a peer with this peer_id, check that socket
|
||||
// addr is same as that of request sender. Otherwise, ignore request.
|
||||
// Since peers have access to each others peer_id's, they could send
|
||||
// requests using them, causing all sorts of issues. Checking naive
|
||||
// (non-converted) socket addresses is enough, since state is split
|
||||
// on converted peer ip.
|
||||
if let Some(previous_peer) = torrent_data.peers.get(&request.peer_id) {
|
||||
if request_sender_meta.naive_peer_addr != previous_peer.connection_meta.naive_peer_addr {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
::log::trace!("received request from {:?}", request_sender_meta);
|
||||
|
||||
// Insert/update/remove peer who sent this request
|
||||
{
|
||||
let peer_status = PeerStatus::from_event_and_bytes_left(
|
||||
request.event.unwrap_or_default(),
|
||||
request.bytes_left,
|
||||
);
|
||||
|
||||
let peer = Peer {
|
||||
connection_meta: request_sender_meta,
|
||||
status: peer_status,
|
||||
valid_until,
|
||||
};
|
||||
|
||||
let opt_removed_peer = match peer_status {
|
||||
PeerStatus::Leeching => {
|
||||
torrent_data.num_leechers += 1;
|
||||
|
||||
torrent_data.peers.insert(request.peer_id, peer)
|
||||
}
|
||||
PeerStatus::Seeding => {
|
||||
torrent_data.num_seeders += 1;
|
||||
|
||||
torrent_data.peers.insert(request.peer_id, peer)
|
||||
}
|
||||
PeerStatus::Stopped => torrent_data.peers.remove(&request.peer_id),
|
||||
};
|
||||
|
||||
match opt_removed_peer.map(|peer| peer.status) {
|
||||
Some(PeerStatus::Leeching) => {
|
||||
torrent_data.num_leechers -= 1;
|
||||
}
|
||||
Some(PeerStatus::Seeding) => {
|
||||
torrent_data.num_seeders -= 1;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
// If peer sent offers, send them on to random peers
|
||||
if let Some(offers) = request.offers {
|
||||
// FIXME: config: also maybe check this when parsing request
|
||||
let max_num_peers_to_take = offers.len().min(config.protocol.max_offers);
|
||||
|
||||
#[inline]
|
||||
fn f(peer: &Peer) -> Peer {
|
||||
*peer
|
||||
}
|
||||
|
||||
let offer_receivers: Vec<Peer> = extract_response_peers(
|
||||
rng,
|
||||
&torrent_data.peers,
|
||||
max_num_peers_to_take,
|
||||
request.peer_id,
|
||||
f,
|
||||
);
|
||||
|
||||
for (offer, offer_receiver) in offers.into_iter().zip(offer_receivers) {
|
||||
let middleman_offer = MiddlemanOfferToPeer {
|
||||
action: AnnounceAction,
|
||||
info_hash: request.info_hash,
|
||||
peer_id: request.peer_id,
|
||||
offer: offer.offer,
|
||||
offer_id: offer.offer_id,
|
||||
};
|
||||
|
||||
out_messages.push((
|
||||
offer_receiver.connection_meta,
|
||||
OutMessage::Offer(middleman_offer),
|
||||
));
|
||||
::log::trace!(
|
||||
"sending middleman offer to {:?}",
|
||||
offer_receiver.connection_meta
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// If peer sent answer, send it on to relevant peer
|
||||
if let (Some(answer), Some(answer_receiver_id), Some(offer_id)) =
|
||||
(request.answer, request.to_peer_id, request.offer_id)
|
||||
{
|
||||
if let Some(answer_receiver) = torrent_data.peers.get(&answer_receiver_id) {
|
||||
let middleman_answer = MiddlemanAnswerToPeer {
|
||||
action: AnnounceAction,
|
||||
peer_id: request.peer_id,
|
||||
info_hash: request.info_hash,
|
||||
answer,
|
||||
offer_id,
|
||||
};
|
||||
|
||||
out_messages.push((
|
||||
answer_receiver.connection_meta,
|
||||
OutMessage::Answer(middleman_answer),
|
||||
));
|
||||
::log::trace!(
|
||||
"sending middleman answer to {:?}",
|
||||
answer_receiver.connection_meta
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let out_message = OutMessage::AnnounceResponse(AnnounceResponse {
|
||||
action: AnnounceAction,
|
||||
info_hash: request.info_hash,
|
||||
complete: torrent_data.num_seeders,
|
||||
incomplete: torrent_data.num_leechers,
|
||||
announce_interval: config.protocol.peer_announce_interval,
|
||||
});
|
||||
|
||||
out_messages.push((request_sender_meta, out_message));
|
||||
}
|
||||
|
||||
pub fn handle_scrape_request(
|
||||
config: &Config,
|
||||
torrent_maps: &mut TorrentMaps,
|
||||
out_messages: &mut Vec<(ConnectionMeta, OutMessage)>,
|
||||
meta: ConnectionMeta,
|
||||
request: ScrapeRequest,
|
||||
) {
|
||||
let info_hashes = if let Some(info_hashes) = request.info_hashes {
|
||||
info_hashes.as_vec()
|
||||
} else {
|
||||
return;
|
||||
};
|
||||
|
||||
let num_to_take = info_hashes.len().min(config.protocol.max_scrape_torrents);
|
||||
|
||||
let mut out_message = ScrapeResponse {
|
||||
action: ScrapeAction,
|
||||
files: HashMap::with_capacity(num_to_take),
|
||||
};
|
||||
|
||||
let torrent_map: &mut TorrentMap = if meta.converted_peer_ip.is_ipv4() {
|
||||
&mut torrent_maps.ipv4
|
||||
} else {
|
||||
&mut torrent_maps.ipv6
|
||||
};
|
||||
|
||||
for info_hash in info_hashes.into_iter().take(num_to_take) {
|
||||
if let Some(torrent_data) = torrent_map.get(&info_hash) {
|
||||
let stats = ScrapeStatistics {
|
||||
complete: torrent_data.num_seeders,
|
||||
downloaded: 0, // No implementation planned
|
||||
incomplete: torrent_data.num_leechers,
|
||||
};
|
||||
|
||||
out_message.files.insert(info_hash, stats);
|
||||
}
|
||||
}
|
||||
|
||||
out_messages.push((meta, OutMessage::ScrapeResponse(out_message)));
|
||||
}
|
||||
144
aquatic_ws/src/common/mod.rs
Normal file
144
aquatic_ws/src/common/mod.rs
Normal file
|
|
@ -0,0 +1,144 @@
|
|||
pub mod handlers;
|
||||
|
||||
use std::net::{IpAddr, SocketAddr};
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache};
|
||||
use aquatic_common::AHashIndexMap;
|
||||
|
||||
pub use aquatic_common::ValidUntil;
|
||||
|
||||
use aquatic_ws_protocol::*;
|
||||
|
||||
use crate::config::Config;
|
||||
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
pub struct PendingScrapeId(pub usize);
|
||||
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
pub struct ConsumerId(pub usize);
|
||||
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub struct ConnectionId(pub usize);
|
||||
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub struct ConnectionMeta {
|
||||
/// Index of socket worker responsible for this connection. Required for
|
||||
/// sending back response through correct channel to correct worker.
|
||||
pub out_message_consumer_id: ConsumerId,
|
||||
pub connection_id: ConnectionId,
|
||||
/// Peer address as received from socket, meaning it wasn't converted to
|
||||
/// an IPv4 address if it was a IPv4-mapped IPv6 address
|
||||
pub naive_peer_addr: SocketAddr,
|
||||
pub converted_peer_ip: IpAddr,
|
||||
pub pending_scrape_id: Option<PendingScrapeId>,
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
|
||||
pub enum PeerStatus {
|
||||
Seeding,
|
||||
Leeching,
|
||||
Stopped,
|
||||
}
|
||||
|
||||
impl PeerStatus {
|
||||
/// Determine peer status from announce event and number of bytes left.
|
||||
///
|
||||
/// Likely, the last branch will be taken most of the time.
|
||||
#[inline]
|
||||
pub fn from_event_and_bytes_left(event: AnnounceEvent, opt_bytes_left: Option<usize>) -> Self {
|
||||
if let AnnounceEvent::Stopped = event {
|
||||
Self::Stopped
|
||||
} else if let Some(0) = opt_bytes_left {
|
||||
Self::Seeding
|
||||
} else {
|
||||
Self::Leeching
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
pub struct Peer {
|
||||
pub connection_meta: ConnectionMeta,
|
||||
pub status: PeerStatus,
|
||||
pub valid_until: ValidUntil,
|
||||
}
|
||||
|
||||
pub type PeerMap = AHashIndexMap<PeerId, Peer>;
|
||||
|
||||
pub struct TorrentData {
|
||||
pub peers: PeerMap,
|
||||
pub num_seeders: usize,
|
||||
pub num_leechers: usize,
|
||||
}
|
||||
|
||||
impl Default for TorrentData {
|
||||
#[inline]
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
peers: Default::default(),
|
||||
num_seeders: 0,
|
||||
num_leechers: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub type TorrentMap = AHashIndexMap<InfoHash, TorrentData>;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct TorrentMaps {
|
||||
pub ipv4: TorrentMap,
|
||||
pub ipv6: TorrentMap,
|
||||
}
|
||||
|
||||
impl TorrentMaps {
|
||||
pub fn clean(&mut self, config: &Config, access_list: &Arc<AccessListArcSwap>) {
|
||||
let mut access_list_cache = create_access_list_cache(access_list);
|
||||
|
||||
Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv4);
|
||||
Self::clean_torrent_map(config, &mut access_list_cache, &mut self.ipv6);
|
||||
}
|
||||
|
||||
fn clean_torrent_map(
|
||||
config: &Config,
|
||||
access_list_cache: &mut AccessListCache,
|
||||
torrent_map: &mut TorrentMap,
|
||||
) {
|
||||
let now = Instant::now();
|
||||
|
||||
torrent_map.retain(|info_hash, torrent_data| {
|
||||
if !access_list_cache
|
||||
.load()
|
||||
.allows(config.access_list.mode, &info_hash.0)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
let num_seeders = &mut torrent_data.num_seeders;
|
||||
let num_leechers = &mut torrent_data.num_leechers;
|
||||
|
||||
torrent_data.peers.retain(|_, peer| {
|
||||
let keep = peer.valid_until.0 >= now;
|
||||
|
||||
if !keep {
|
||||
match peer.status {
|
||||
PeerStatus::Seeding => {
|
||||
*num_seeders -= 1;
|
||||
}
|
||||
PeerStatus::Leeching => {
|
||||
*num_leechers -= 1;
|
||||
}
|
||||
_ => (),
|
||||
};
|
||||
}
|
||||
|
||||
keep
|
||||
});
|
||||
|
||||
!torrent_data.peers.is_empty()
|
||||
});
|
||||
|
||||
torrent_map.shrink_to_fit();
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue