diff --git a/aquatic/src/common.rs b/aquatic/src/common.rs new file mode 100644 index 0000000..9d281ac --- /dev/null +++ b/aquatic/src/common.rs @@ -0,0 +1,133 @@ +use std::sync::Arc; +use std::sync::atomic::AtomicUsize; +use std::net::{SocketAddr, IpAddr}; +use std::time::Instant; + +use dashmap::DashMap; +use indexmap::IndexMap; + +pub use bittorrent_udp::types::*; + + +pub const EVENT_CAPACITY: usize = 4096; +pub const MAX_PACKET_SIZE: usize = 4096; + + +#[derive(Debug, Clone, Copy)] +pub struct Time(pub Instant); + + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct ConnectionKey { + pub connection_id: ConnectionId, + pub socket_addr: SocketAddr +} + +pub type ConnectionMap = DashMap; + + +#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] +pub enum PeerStatus { + Seeding, + Leeching, + Stopped +} + +impl PeerStatus { + /// Determine peer status from announce event and number of bytes left. + /// + /// Likely, the last branch will be taken most of the time. + pub fn from_event_and_bytes_left( + event: AnnounceEvent, + bytes_left: NumberOfBytes + ) -> Self { + if event == AnnounceEvent::Stopped { + Self::Stopped + } else if bytes_left.0 == 0 { + Self::Seeding + } else { + Self::Leeching + } + } +} + +#[derive(Clone, Debug)] +pub struct Peer { + pub id: PeerId, + pub connection_id: ConnectionId, + pub ip_address: IpAddr, + pub port: Port, + pub status: PeerStatus, + pub last_announce: Time +} + + +impl Peer { + pub fn to_response_peer(&self) -> ResponsePeer { + ResponsePeer { + ip_address: self.ip_address, + port: self.port + } + } + pub fn from_announce_and_ip( + announce_request: &AnnounceRequest, + ip_address: IpAddr + ) -> Self { + Self { + id: announce_request.peer_id, + connection_id: announce_request.connection_id, + ip_address, + port: announce_request.port, + status: PeerStatus::from_event_and_bytes_left( + announce_request.event, + announce_request.bytes_left + ), + last_announce: Time(Instant::now()) + } + } +} + +#[derive(PartialEq, Eq, Hash, Clone)] +pub struct PeerMapKey { + pub ip: IpAddr, + pub peer_id: PeerId +} + + +pub type PeerMap = IndexMap; + +pub struct TorrentData { + pub peers: PeerMap, + pub num_seeders: AtomicUsize, + pub num_leechers: AtomicUsize, +} + + +impl Default for TorrentData { + fn default() -> Self { + Self { + peers: IndexMap::new(), + num_seeders: AtomicUsize::new(0), + num_leechers: AtomicUsize::new(0), + } + } +} + + +pub type TorrentMap = DashMap; + + +#[derive(Clone)] +pub struct State { + pub connections: Arc, + pub torrents: Arc, +} + +impl State { + pub fn new() -> Self { + Self { + connections: Arc::new(DashMap::new()), + torrents: Arc::new(DashMap::new()), + } + } +} \ No newline at end of file diff --git a/aquatic/src/handler.rs b/aquatic/src/handler.rs index 6c786b9..6ea5f2e 100644 --- a/aquatic/src/handler.rs +++ b/aquatic/src/handler.rs @@ -1,25 +1,24 @@ use std::net::SocketAddr; use std::sync::atomic::Ordering; use std::time::Instant; +use std::vec::Drain; use rand::{self, SeedableRng, rngs::SmallRng, thread_rng}; use rand::seq::IteratorRandom; use bittorrent_udp::types::*; -use crate::types::*; +use crate::common::*; -pub fn gen_responses( +pub fn handle_connect_requests( state: &State, - connect_requests: Vec<(ConnectRequest, SocketAddr)>, - announce_requests: Vec<(AnnounceRequest, SocketAddr)> -)-> Vec<(Response, SocketAddr)> { - let mut responses = Vec::new(); - + responses: &mut Vec<(Response, SocketAddr)>, + requests: Drain<(ConnectRequest, SocketAddr)>, +){ let now = Time(Instant::now()); - for (request, src) in connect_requests { + for (request, src) in requests { let connection_id = ConnectionId(rand::random()); let key = ConnectionKey { @@ -36,8 +35,15 @@ pub fn gen_responses( } ), src)); } +} - for (request, src) in announce_requests { + +pub fn handle_announce_requests( + state: &State, + responses: &mut Vec<(Response, SocketAddr)>, + requests: Drain<(AnnounceRequest, SocketAddr)>, +){ + for (request, src) in requests { let connection_key = ConnectionKey { connection_id: request.connection_id, socket_addr: src, @@ -101,8 +107,37 @@ pub fn gen_responses( responses.push((response, src)); } +} - responses + +pub fn handle_scrape_requests( + state: &State, + responses: &mut Vec<(Response, SocketAddr)>, + requests: Drain<(ScrapeRequest, SocketAddr)>, +){ + let empty_stats = create_torrent_scrape_statistics(0, 0); + + for (request, src) in requests { + let mut stats: Vec = Vec::with_capacity(256); + + for info_hash in request.info_hashes.iter() { + if let Some(torrent_data) = state.torrents.get(info_hash){ + stats.push(create_torrent_scrape_statistics( + torrent_data.num_seeders.load(Ordering::SeqCst) as i32, + torrent_data.num_leechers.load(Ordering::SeqCst) as i32, + )); + } else { + stats.push(empty_stats); + } + } + + let response = Response::Scrape(ScrapeResponse { + transaction_id: request.transaction_id, + torrent_stats: stats, + }); + + responses.push((response, src)); + } } @@ -131,4 +166,16 @@ pub fn extract_response_peers( .map(Peer::to_response_peer) .choose_multiple(&mut rng, number_of_peers_to_take) } +} + + +pub fn create_torrent_scrape_statistics( + seeders: i32, + leechers: i32 +) -> TorrentScrapeStatistics { + TorrentScrapeStatistics { + seeders: NumberOfPeers(seeders), + completed: NumberOfDownloads(0), // No implementation planned + leechers: NumberOfPeers(leechers) + } } \ No newline at end of file diff --git a/aquatic/src/main.rs b/aquatic/src/main.rs index 613186f..03e0046 100644 --- a/aquatic/src/main.rs +++ b/aquatic/src/main.rs @@ -1,10 +1,10 @@ use std::time::Duration; +mod common; mod handler; mod network; -mod types; -use types::State; +use common::State; fn main(){ @@ -17,9 +17,9 @@ fn main(){ let state = state.clone(); ::std::thread::spawn(move || { - network::run_event_loop(state, socket, i, 4096, Duration::from_millis(1000)); + network::run_event_loop(state, socket, i, Duration::from_millis(1000)); }); } - network::run_event_loop(state, socket, 0, 4096, Duration::from_millis(1000)); + network::run_event_loop(state, socket, 0, Duration::from_millis(1000)); } diff --git a/aquatic/src/network.rs b/aquatic/src/network.rs index 28680d3..2f1405c 100644 --- a/aquatic/src/network.rs +++ b/aquatic/src/network.rs @@ -10,7 +10,7 @@ use net2::unix::UnixUdpBuilderExt; use bittorrent_udp::types::IpVersion; use bittorrent_udp::converters::{response_to_bytes, request_from_bytes}; -use crate::types::*; +use crate::common::*; use crate::handler::*; @@ -52,21 +52,25 @@ pub fn run_event_loop( state: State, socket: ::std::net::UdpSocket, token_num: usize, - event_capacity: usize, poll_timeout: Duration, ){ - let mut buffer = [0u8; 4096]; + let mut buffer = [0u8; MAX_PACKET_SIZE]; let mut socket = UdpSocket::from_std(socket); let mut poll = Poll::new().expect("create poll"); - let interests = Interest::READABLE | Interest::WRITABLE; + let interests = Interest::READABLE; poll.registry() .register(&mut socket, Token(token_num), interests) .unwrap(); - let mut events = Events::with_capacity(event_capacity); + let mut events = Events::with_capacity(EVENT_CAPACITY); + + let mut connect_requests: Vec<(ConnectRequest, SocketAddr)> = Vec::with_capacity(1024); + let mut announce_requests: Vec<(AnnounceRequest, SocketAddr)> = Vec::with_capacity(1024); + let mut scrape_requests: Vec<(ScrapeRequest, SocketAddr)> = Vec::with_capacity(1024); + let mut responses: Vec<(Response, SocketAddr)> = Vec::with_capacity(1024); loop { poll.poll(&mut events, Some(poll_timeout)) @@ -77,9 +81,6 @@ pub fn run_event_loop( if token.0 == token_num { if event.is_readable(){ - let mut connect_requests: Vec<(ConnectRequest, SocketAddr)> = Vec::with_capacity(event_capacity); - let mut announce_requests: Vec<(AnnounceRequest, SocketAddr)> = Vec::with_capacity(event_capacity); - loop { match socket.recv_from(&mut buffer) { Ok((amt, src)) => { @@ -95,6 +96,9 @@ pub fn run_event_loop( Request::Announce(r) => { announce_requests.push((r, src)); }, + Request::Scrape(r) => { + scrape_requests.push((r, src)); + }, _ => { // FIXME } @@ -115,13 +119,23 @@ pub fn run_event_loop( } } - let responses = gen_responses( + handle_connect_requests( &state, - connect_requests, - announce_requests + &mut responses, + connect_requests.drain(..) + ); + handle_announce_requests( + &state, + &mut responses, + announce_requests.drain(..), + ); + handle_scrape_requests( + &state, + &mut responses, + scrape_requests.drain(..), ); - for (response, src) in responses { + for (response, src) in responses.drain(..) { let bytes = response_to_bytes(&response, IpVersion::IPv4); match socket.send_to(&bytes[..], src){