diff --git a/Cargo.lock b/Cargo.lock index 4d0b960..f5b0188 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -82,6 +82,7 @@ dependencies = [ name = "aquatic_ws" version = "0.1.0" dependencies = [ + "aquatic", "bittorrent_udp", "cli_helpers", "flume", diff --git a/aquatic_ws/Cargo.toml b/aquatic_ws/Cargo.toml index d5a165d..7d708b1 100644 --- a/aquatic_ws/Cargo.toml +++ b/aquatic_ws/Cargo.toml @@ -14,6 +14,7 @@ name = "aquatic_ws" path = "src/bin/main.rs" [dependencies] +aquatic = { path = "../aquatic" } bittorrent_udp = { path = "../bittorrent_udp" } cli_helpers = { path = "../cli_helpers" } flume = "0.7" diff --git a/aquatic_ws/src/lib/common.rs b/aquatic_ws/src/lib/common.rs index 00e5b28..ce3687c 100644 --- a/aquatic_ws/src/lib/common.rs +++ b/aquatic_ws/src/lib/common.rs @@ -1,5 +1,4 @@ use std::net::SocketAddr; -use std::time::Instant; use std::sync::Arc; use flume::{Sender, Receiver}; @@ -7,17 +6,56 @@ use hashbrown::HashMap; use indexmap::IndexMap; use parking_lot::Mutex; +pub use aquatic::common::ValidUntil; + use crate::protocol::*; -pub struct ValidUntil(pub Instant); +#[derive(Clone, Copy)] +pub struct ConnectionMeta { + /// Index of socket worker responsible for this connection. Required for + /// sending back response through correct channel to correct worker. + pub socket_worker_index: usize, + /// SocketAddr of peer + pub peer_socket_addr: SocketAddr, + /// Slab index of PeerConnection + pub socket_worker_slab_index: usize, +} +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +pub enum PeerStatus { + Seeding, + Leeching, + Stopped +} + + +impl PeerStatus { + /// Determine peer status from announce event and number of bytes left. + /// + /// Likely, the last branch will be taken most of the time. + #[inline] + pub fn from_event_and_bytes_left( + event: AnnounceEvent, + bytes_left: usize + ) -> Self { + if let AnnounceEvent::Stopped = event { + Self::Stopped + } else if bytes_left == 0 { + Self::Seeding + } else { + Self::Leeching + } + } +} + + +#[derive(Clone, Copy)] pub struct Peer { - pub peer_id: PeerId, // FIXME: maybe this field can be removed - pub complete: bool, - pub valid_until: ValidUntil, pub connection_meta: ConnectionMeta, + pub status: PeerStatus, + pub valid_until: ValidUntil, } @@ -26,8 +64,8 @@ pub type PeerMap = IndexMap; pub struct TorrentData { pub peers: PeerMap, - pub seeders: usize, - pub leechers: usize, + pub num_seeders: usize, + pub num_leechers: usize, } @@ -35,8 +73,8 @@ impl Default for TorrentData { fn default() -> Self { Self { peers: IndexMap::new(), - seeders: 0, - leechers: 0, + num_seeders: 0, + num_leechers: 0, } } } @@ -59,18 +97,6 @@ impl Default for State { } -#[derive(Clone, Copy)] -pub struct ConnectionMeta { - /// Index of socket worker responsible for this connection. Required for - /// sending back response through correct channel to correct worker. - pub socket_worker_index: usize, - /// SocketAddr of peer - pub peer_socket_addr: SocketAddr, - /// Slab index of PeerConnection - pub socket_worker_slab_index: usize, -} - - pub type InMessageSender = Sender<(ConnectionMeta, InMessage)>; pub type InMessageReceiver = Receiver<(ConnectionMeta, InMessage)>; pub type OutMessageReceiver = Receiver<(ConnectionMeta, OutMessage)>; diff --git a/aquatic_ws/src/lib/handler.rs b/aquatic_ws/src/lib/handler.rs index 85f9ef5..80ee7ec 100644 --- a/aquatic_ws/src/lib/handler.rs +++ b/aquatic_ws/src/lib/handler.rs @@ -3,6 +3,9 @@ use std::vec::Drain; use hashbrown::HashMap; use parking_lot::MutexGuard; +use rand::{Rng, SeedableRng, rngs::SmallRng}; + +use aquatic::handlers::{extract_response_peers}; use crate::common::*; use crate::protocol::*; @@ -18,6 +21,8 @@ pub fn run_request_worker( let mut announce_requests = Vec::new(); let mut scrape_requests = Vec::new(); + let mut rng = SmallRng::from_entropy(); + let timeout = Duration::from_micros(200); loop { @@ -51,6 +56,7 @@ pub fn run_request_worker( .unwrap_or_else(|| state.torrents.lock()); handle_announce_requests( + &mut rng, &mut torrent_map_guard, &mut out_messages, announce_requests.drain(..) @@ -72,20 +78,85 @@ pub fn run_request_worker( pub fn handle_announce_requests( + rng: &mut impl Rng, torrents: &mut TorrentMap, messages_out: &mut Vec<(ConnectionMeta, OutMessage)>, requests: Drain<(ConnectionMeta, AnnounceRequest)>, ){ + let valid_until = ValidUntil::new(240); + for (sender_meta, request) in requests { let torrent_data = torrents.entry(request.info_hash) .or_default(); - - // TODO: insert peer, update stats etc - if let Some(offers) = request.offers { - // if offers are set, fetch same number of peers, send offers to all of them + let peer_status = PeerStatus::from_event_and_bytes_left( + request.event, + request.bytes_left + ); + + let peer = Peer { + connection_meta: sender_meta, + status: peer_status, + valid_until, + }; + + let opt_removed_peer = match peer_status { + PeerStatus::Leeching => { + torrent_data.num_leechers += 1; + + torrent_data.peers.insert(request.peer_id, peer) + }, + PeerStatus::Seeding => { + torrent_data.num_seeders += 1; + + torrent_data.peers.insert(request.peer_id, peer) + }, + PeerStatus::Stopped => { + torrent_data.peers.remove(&request.peer_id) + } + }; + + match opt_removed_peer.map(|peer| peer.status){ + Some(PeerStatus::Leeching) => { + torrent_data.num_leechers -= 1; + }, + Some(PeerStatus::Seeding) => { + torrent_data.num_seeders -= 1; + }, + _ => {} } + // If peer sent offers, send them on to random peers + if let Some(offers) = request.offers { + let max_num_peers_to_take = offers.len().min(10); // FIXME: config + + fn f(peer: &Peer) -> Peer { + *peer + } + + let peers = extract_response_peers( + rng, + &torrent_data.peers, + max_num_peers_to_take, + f + ); + + for (offer, peer) in offers.into_iter().zip(peers){ + let middleman_offer = MiddlemanOfferToPeer { + info_hash: request.info_hash, + peer_id: request.peer_id, + offer: offer.offer, + offer_id: offer.offer_id, + }; + + messages_out.push(( + peer.connection_meta, + OutMessage::Offer(middleman_offer) + )); + } + } + + // If peer sent answer, send it on to relevant peer match (request.answer, request.to_peer_id, request.offer_id){ (Some(answer), Some(to_peer_id), Some(offer_id)) => { if let Some(to_peer) = torrent_data.peers.get(&to_peer_id){ @@ -107,9 +178,9 @@ pub fn handle_announce_requests( let response = OutMessage::AnnounceResponse(AnnounceResponse { info_hash: request.info_hash, - complete: torrent_data.seeders, - incomplete: torrent_data.leechers, - announce_interval: 120, + complete: torrent_data.num_seeders, + incomplete: torrent_data.num_leechers, + announce_interval: 120, // FIXME: config }); messages_out.push((sender_meta, response)); @@ -138,9 +209,9 @@ pub fn handle_scrape_requests( for info_hash in info_hashes { if let Some(torrent_data) = torrents.get(&info_hash){ let stats = ScrapeStatistics { - complete: torrent_data.seeders, + complete: torrent_data.num_seeders, downloaded: 0, // No implementation planned - incomplete: torrent_data.leechers, + incomplete: torrent_data.num_leechers, }; response.files.insert(info_hash, stats); diff --git a/aquatic_ws/src/lib/protocol.rs b/aquatic_ws/src/lib/protocol.rs index 9f2b0d9..9060fda 100644 --- a/aquatic_ws/src/lib/protocol.rs +++ b/aquatic_ws/src/lib/protocol.rs @@ -86,7 +86,7 @@ pub struct AnnounceRequest { pub peer_id: PeerId, /// Just called "left" in protocol #[serde(rename = "left")] - pub bytes_left: bool, + pub bytes_left: usize, // FIXME: I had this set as bool before, check! /// Can be empty. Then, default is "update" #[serde(default)] pub event: AnnounceEvent,