aquatic_ws: pass on offers to other peers; minor fixes

This commit is contained in:
Joakim Frostegård 2020-05-08 21:05:46 +02:00
parent f2ae494902
commit 5b58db90e3
5 changed files with 130 additions and 31 deletions

1
Cargo.lock generated
View file

@ -82,6 +82,7 @@ dependencies = [
name = "aquatic_ws" name = "aquatic_ws"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"aquatic",
"bittorrent_udp", "bittorrent_udp",
"cli_helpers", "cli_helpers",
"flume", "flume",

View file

@ -14,6 +14,7 @@ name = "aquatic_ws"
path = "src/bin/main.rs" path = "src/bin/main.rs"
[dependencies] [dependencies]
aquatic = { path = "../aquatic" }
bittorrent_udp = { path = "../bittorrent_udp" } bittorrent_udp = { path = "../bittorrent_udp" }
cli_helpers = { path = "../cli_helpers" } cli_helpers = { path = "../cli_helpers" }
flume = "0.7" flume = "0.7"

View file

@ -1,5 +1,4 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use std::time::Instant;
use std::sync::Arc; use std::sync::Arc;
use flume::{Sender, Receiver}; use flume::{Sender, Receiver};
@ -7,17 +6,56 @@ use hashbrown::HashMap;
use indexmap::IndexMap; use indexmap::IndexMap;
use parking_lot::Mutex; use parking_lot::Mutex;
pub use aquatic::common::ValidUntil;
use crate::protocol::*; use crate::protocol::*;
pub struct ValidUntil(pub Instant); #[derive(Clone, Copy)]
pub struct ConnectionMeta {
/// Index of socket worker responsible for this connection. Required for
/// sending back response through correct channel to correct worker.
pub socket_worker_index: usize,
/// SocketAddr of peer
pub peer_socket_addr: SocketAddr,
/// Slab index of PeerConnection
pub socket_worker_slab_index: usize,
}
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub enum PeerStatus {
Seeding,
Leeching,
Stopped
}
impl PeerStatus {
/// Determine peer status from announce event and number of bytes left.
///
/// Likely, the last branch will be taken most of the time.
#[inline]
pub fn from_event_and_bytes_left(
event: AnnounceEvent,
bytes_left: usize
) -> Self {
if let AnnounceEvent::Stopped = event {
Self::Stopped
} else if bytes_left == 0 {
Self::Seeding
} else {
Self::Leeching
}
}
}
#[derive(Clone, Copy)]
pub struct Peer { pub struct Peer {
pub peer_id: PeerId, // FIXME: maybe this field can be removed
pub complete: bool,
pub valid_until: ValidUntil,
pub connection_meta: ConnectionMeta, pub connection_meta: ConnectionMeta,
pub status: PeerStatus,
pub valid_until: ValidUntil,
} }
@ -26,8 +64,8 @@ pub type PeerMap = IndexMap<PeerId, Peer>;
pub struct TorrentData { pub struct TorrentData {
pub peers: PeerMap, pub peers: PeerMap,
pub seeders: usize, pub num_seeders: usize,
pub leechers: usize, pub num_leechers: usize,
} }
@ -35,8 +73,8 @@ impl Default for TorrentData {
fn default() -> Self { fn default() -> Self {
Self { Self {
peers: IndexMap::new(), peers: IndexMap::new(),
seeders: 0, num_seeders: 0,
leechers: 0, num_leechers: 0,
} }
} }
} }
@ -59,18 +97,6 @@ impl Default for State {
} }
#[derive(Clone, Copy)]
pub struct ConnectionMeta {
/// Index of socket worker responsible for this connection. Required for
/// sending back response through correct channel to correct worker.
pub socket_worker_index: usize,
/// SocketAddr of peer
pub peer_socket_addr: SocketAddr,
/// Slab index of PeerConnection
pub socket_worker_slab_index: usize,
}
pub type InMessageSender = Sender<(ConnectionMeta, InMessage)>; pub type InMessageSender = Sender<(ConnectionMeta, InMessage)>;
pub type InMessageReceiver = Receiver<(ConnectionMeta, InMessage)>; pub type InMessageReceiver = Receiver<(ConnectionMeta, InMessage)>;
pub type OutMessageReceiver = Receiver<(ConnectionMeta, OutMessage)>; pub type OutMessageReceiver = Receiver<(ConnectionMeta, OutMessage)>;

View file

@ -3,6 +3,9 @@ use std::vec::Drain;
use hashbrown::HashMap; use hashbrown::HashMap;
use parking_lot::MutexGuard; use parking_lot::MutexGuard;
use rand::{Rng, SeedableRng, rngs::SmallRng};
use aquatic::handlers::{extract_response_peers};
use crate::common::*; use crate::common::*;
use crate::protocol::*; use crate::protocol::*;
@ -18,6 +21,8 @@ pub fn run_request_worker(
let mut announce_requests = Vec::new(); let mut announce_requests = Vec::new();
let mut scrape_requests = Vec::new(); let mut scrape_requests = Vec::new();
let mut rng = SmallRng::from_entropy();
let timeout = Duration::from_micros(200); let timeout = Duration::from_micros(200);
loop { loop {
@ -51,6 +56,7 @@ pub fn run_request_worker(
.unwrap_or_else(|| state.torrents.lock()); .unwrap_or_else(|| state.torrents.lock());
handle_announce_requests( handle_announce_requests(
&mut rng,
&mut torrent_map_guard, &mut torrent_map_guard,
&mut out_messages, &mut out_messages,
announce_requests.drain(..) announce_requests.drain(..)
@ -72,20 +78,85 @@ pub fn run_request_worker(
pub fn handle_announce_requests( pub fn handle_announce_requests(
rng: &mut impl Rng,
torrents: &mut TorrentMap, torrents: &mut TorrentMap,
messages_out: &mut Vec<(ConnectionMeta, OutMessage)>, messages_out: &mut Vec<(ConnectionMeta, OutMessage)>,
requests: Drain<(ConnectionMeta, AnnounceRequest)>, requests: Drain<(ConnectionMeta, AnnounceRequest)>,
){ ){
let valid_until = ValidUntil::new(240);
for (sender_meta, request) in requests { for (sender_meta, request) in requests {
let torrent_data = torrents.entry(request.info_hash) let torrent_data = torrents.entry(request.info_hash)
.or_default(); .or_default();
// TODO: insert peer, update stats etc
if let Some(offers) = request.offers { let peer_status = PeerStatus::from_event_and_bytes_left(
// if offers are set, fetch same number of peers, send offers to all of them request.event,
request.bytes_left
);
let peer = Peer {
connection_meta: sender_meta,
status: peer_status,
valid_until,
};
let opt_removed_peer = match peer_status {
PeerStatus::Leeching => {
torrent_data.num_leechers += 1;
torrent_data.peers.insert(request.peer_id, peer)
},
PeerStatus::Seeding => {
torrent_data.num_seeders += 1;
torrent_data.peers.insert(request.peer_id, peer)
},
PeerStatus::Stopped => {
torrent_data.peers.remove(&request.peer_id)
}
};
match opt_removed_peer.map(|peer| peer.status){
Some(PeerStatus::Leeching) => {
torrent_data.num_leechers -= 1;
},
Some(PeerStatus::Seeding) => {
torrent_data.num_seeders -= 1;
},
_ => {}
} }
// If peer sent offers, send them on to random peers
if let Some(offers) = request.offers {
let max_num_peers_to_take = offers.len().min(10); // FIXME: config
fn f(peer: &Peer) -> Peer {
*peer
}
let peers = extract_response_peers(
rng,
&torrent_data.peers,
max_num_peers_to_take,
f
);
for (offer, peer) in offers.into_iter().zip(peers){
let middleman_offer = MiddlemanOfferToPeer {
info_hash: request.info_hash,
peer_id: request.peer_id,
offer: offer.offer,
offer_id: offer.offer_id,
};
messages_out.push((
peer.connection_meta,
OutMessage::Offer(middleman_offer)
));
}
}
// If peer sent answer, send it on to relevant peer
match (request.answer, request.to_peer_id, request.offer_id){ match (request.answer, request.to_peer_id, request.offer_id){
(Some(answer), Some(to_peer_id), Some(offer_id)) => { (Some(answer), Some(to_peer_id), Some(offer_id)) => {
if let Some(to_peer) = torrent_data.peers.get(&to_peer_id){ if let Some(to_peer) = torrent_data.peers.get(&to_peer_id){
@ -107,9 +178,9 @@ pub fn handle_announce_requests(
let response = OutMessage::AnnounceResponse(AnnounceResponse { let response = OutMessage::AnnounceResponse(AnnounceResponse {
info_hash: request.info_hash, info_hash: request.info_hash,
complete: torrent_data.seeders, complete: torrent_data.num_seeders,
incomplete: torrent_data.leechers, incomplete: torrent_data.num_leechers,
announce_interval: 120, announce_interval: 120, // FIXME: config
}); });
messages_out.push((sender_meta, response)); messages_out.push((sender_meta, response));
@ -138,9 +209,9 @@ pub fn handle_scrape_requests(
for info_hash in info_hashes { for info_hash in info_hashes {
if let Some(torrent_data) = torrents.get(&info_hash){ if let Some(torrent_data) = torrents.get(&info_hash){
let stats = ScrapeStatistics { let stats = ScrapeStatistics {
complete: torrent_data.seeders, complete: torrent_data.num_seeders,
downloaded: 0, // No implementation planned downloaded: 0, // No implementation planned
incomplete: torrent_data.leechers, incomplete: torrent_data.num_leechers,
}; };
response.files.insert(info_hash, stats); response.files.insert(info_hash, stats);

View file

@ -86,7 +86,7 @@ pub struct AnnounceRequest {
pub peer_id: PeerId, pub peer_id: PeerId,
/// Just called "left" in protocol /// Just called "left" in protocol
#[serde(rename = "left")] #[serde(rename = "left")]
pub bytes_left: bool, pub bytes_left: usize, // FIXME: I had this set as bool before, check!
/// Can be empty. Then, default is "update" /// Can be empty. Then, default is "update"
#[serde(default)] #[serde(default)]
pub event: AnnounceEvent, pub event: AnnounceEvent,