diff --git a/aquatic_ws/Cargo.toml b/aquatic_ws/Cargo.toml index c7f54c6..3497d23 100644 --- a/aquatic_ws/Cargo.toml +++ b/aquatic_ws/Cargo.toml @@ -5,9 +5,9 @@ authors = ["Joakim FrostegÄrd "] edition = "2018" license = "Apache-2.0" -# [lib] -# name = "aquatic_ws" -# path = "src/lib/lib.rs" +[lib] +name = "aquatic_ws" +path = "src/lib/lib.rs" [[bin]] name = "aquatic_ws" diff --git a/aquatic_ws/src/bin/main.rs b/aquatic_ws/src/bin/main.rs index 5903872..1c46e7f 100644 --- a/aquatic_ws/src/bin/main.rs +++ b/aquatic_ws/src/bin/main.rs @@ -1,216 +1,6 @@ -//! There is not much point in doing more work until more clarity on -//! exact protocol is achieved - -use std::net::SocketAddr; -use std::time::{Duration, Instant}; -use std::io::ErrorKind; -use std::option::Option; - -use slab::Slab; -use tungstenite::{Message, WebSocket}; - -use mio::{Events, Poll, Interest, Token}; -use mio::net::{TcpListener, TcpStream}; - - -pub struct PeerConnection { - pub ws: WebSocket, - pub peer_socket_addr: SocketAddr, - pub valid_until: Instant, -} - - -/// First thoughts on what to send to handler -pub struct HandlerMessage { - /// Index of socket worker that read this request. Required for sending - /// back response through correct channel to correct worker. - pub socket_worker_index: usize, - /// FIXME: Should this be parsed request? - pub message: T, - /// SocketAddr of peer - pub peer_socket_addr: SocketAddr, - /// Slab index of PeerConnection - pub peer_connection_index: usize, -} - - -fn run_network_worker(){ - let address: SocketAddr = "0.0.0.0:3000".parse().unwrap(); - - let mut listener = TcpListener::bind(address).unwrap(); - let mut poll = Poll::new().expect("create poll"); - - poll.registry() - .register(&mut listener, Token(0), Interest::READABLE) - .unwrap(); - - let mut events = Events::with_capacity(1024); - - let timeout = Duration::from_millis(50); - - let mut connections: Slab> = Slab::new(); - - // Insert empty first entry to prevent assignment of index 0 - assert_eq!(connections.insert(None), 0); - - loop { - poll.poll(&mut events, Some(timeout)) - .expect("failed polling"); - - let valid_until = Instant::now() + Duration::from_secs(600); - - for event in events.iter(){ - let token = event.token(); - - if token.0 == 0 { - loop { - match listener.accept(){ - Ok((mut stream, src)) => { - let entry = connections.vacant_entry(); - let token = Token(entry.key()); - - poll.registry() - .register(&mut stream, token, Interest::READABLE) - .unwrap(); - - // FIXME: will this cause issues due to blocking? - // Should handshake be started manually below - // instead? - let ws = tungstenite::server::accept(stream).unwrap(); - - let peer_connection = PeerConnection { - ws, - peer_socket_addr: src, - valid_until, - }; - - entry.insert(Some(peer_connection)); - }, - Err(err) => { - if err.kind() == ErrorKind::WouldBlock { - break - } - - eprint!("{}", err); - } - } - } - } else if event.is_readable(){ - loop { - if let Some(Some(connection)) = connections.get_mut(token.0){ - match connection.ws.read_message(){ - Ok(message) => { - // FIXME: parse message, send to handler - // through channel (flume?) - - connection.valid_until = valid_until; - }, - Err(tungstenite::Error::Io(err)) => { - if err.kind() == ErrorKind::WouldBlock { - break - } - - eprint!("{}", err); - }, - Err(tungstenite::Error::ConnectionClosed) => { - // FIXME: necessary? - poll.registry().deregister(connection.ws.get_mut()).unwrap(); - - connections.remove(token.0); - }, - Err(err) => { - eprint!("{}", err); - } - } - } - } - } - } - - let now = Instant::now(); - - // Close connections after some time of inactivity and write pending - // messages (which is required after closing anyway.) - // - // FIXME: peers need to be removed too, wherever they are stored - connections.retain(|_, opt_connection| { - if let Some(connection) = opt_connection { - if connection.valid_until < now { - connection.ws.close(None).unwrap(); - } - - loop { - match connection.ws.write_pending(){ - Err(tungstenite::Error::Io(err)) => { - if err.kind() == ErrorKind::WouldBlock { - break - } - }, - Err(tungstenite::Error::ConnectionClosed) => { - // FIXME: necessary? - poll.registry() - .deregister(connection.ws.get_mut()) - .unwrap(); - - return false; - }, - _ => {} - } - } - } - - true - }); - - // TODO: loop through responses from channel, write them to wss or - // possibly register ws as writable (but this means event capacity - // must be adjusted accordingy and is limiting) - - // How should IP's be handled? Send index and src to processing and - // lookup on return that entry is correct. Old ideas: - // Maybe use IndexMap and use numerical - // index for token? Removing element from IndexMap requires shifting - // or swapping indeces, so not very good. - for _ in 0..100 { - let connection_index = 1; - let peer_socket_addr: SocketAddr = "127.0.0.1:8080".parse().unwrap(); - let message = Message::Text("test".to_string()); - - let opt_connection = connections - .get_mut(connection_index); - - if let Some(Some(connection)) = opt_connection { - if connection.peer_socket_addr != peer_socket_addr { - continue; - } - - match connection.ws.write_message(message){ - Ok(()) => {}, - Err(tungstenite::Error::Io(err)) => { - if err.kind() == ErrorKind::WouldBlock { - continue; - } - - eprint!("{}", err); - }, - Err(tungstenite::Error::ConnectionClosed) => { - // FIXME: necessary? - poll.registry() - .deregister(connection.ws.get_mut()) - .unwrap(); - - connections.remove(connection_index); - }, - Err(err) => { - eprint!("{}", err); - }, - } - } - } - } -} +use aquatic_ws; fn main(){ - run_network_worker(); + aquatic_ws::run_network_worker(); } \ No newline at end of file diff --git a/aquatic_ws/src/lib/lib.rs b/aquatic_ws/src/lib/lib.rs new file mode 100644 index 0000000..424f625 --- /dev/null +++ b/aquatic_ws/src/lib/lib.rs @@ -0,0 +1,213 @@ +//! There is not much point in doing more work until more clarity on +//! exact protocol is achieved + +use std::net::{SocketAddr}; +use std::time::{Duration, Instant}; +use std::io::ErrorKind; +use std::option::Option; + +use slab::Slab; +use tungstenite::{Message, WebSocket}; + +use mio::{Events, Poll, Interest, Token}; +use mio::net::{TcpListener, TcpStream}; + +pub mod protocol; + + +pub struct PeerConnection { + pub ws: WebSocket, + pub peer_socket_addr: SocketAddr, + pub valid_until: Instant, +} + + +/// First thoughts on what to send to handler +pub struct HandlerMessage { + /// Index of socket worker that read this request. Required for sending + /// back response through correct channel to correct worker. + pub socket_worker_index: usize, + /// FIXME: Should this be parsed request? + pub message: T, + /// SocketAddr of peer + pub peer_socket_addr: SocketAddr, + /// Slab index of PeerConnection + pub peer_connection_index: usize, +} + + +pub fn run_network_worker(){ + let address: SocketAddr = "0.0.0.0:3000".parse().unwrap(); + + let mut listener = TcpListener::bind(address).unwrap(); + let mut poll = Poll::new().expect("create poll"); + + poll.registry() + .register(&mut listener, Token(0), Interest::READABLE) + .unwrap(); + + let mut events = Events::with_capacity(1024); + + let timeout = Duration::from_millis(50); + + let mut connections: Slab> = Slab::new(); + + // Insert empty first entry to prevent assignment of index 0 + assert_eq!(connections.insert(None), 0); + + loop { + poll.poll(&mut events, Some(timeout)) + .expect("failed polling"); + + let valid_until = Instant::now() + Duration::from_secs(600); + + for event in events.iter(){ + let token = event.token(); + + if token.0 == 0 { + loop { + match listener.accept(){ + Ok((mut stream, src)) => { + let entry = connections.vacant_entry(); + let token = Token(entry.key()); + + poll.registry() + .register(&mut stream, token, Interest::READABLE) + .unwrap(); + + // FIXME: will this cause issues due to blocking? + // Should handshake be started manually below + // instead? + let ws = tungstenite::server::accept(stream).unwrap(); + + let peer_connection = PeerConnection { + ws, + peer_socket_addr: src, + valid_until, + }; + + entry.insert(Some(peer_connection)); + }, + Err(err) => { + if err.kind() == ErrorKind::WouldBlock { + break + } + + eprint!("{}", err); + } + } + } + } else if event.is_readable(){ + loop { + if let Some(Some(connection)) = connections.get_mut(token.0){ + match connection.ws.read_message(){ + Ok(message) => { + // FIXME: parse message, send to handler + // through channel (flume?) + + connection.valid_until = valid_until; + }, + Err(tungstenite::Error::Io(err)) => { + if err.kind() == ErrorKind::WouldBlock { + break + } + + eprint!("{}", err); + }, + Err(tungstenite::Error::ConnectionClosed) => { + // FIXME: necessary? + poll.registry().deregister(connection.ws.get_mut()).unwrap(); + + connections.remove(token.0); + }, + Err(err) => { + eprint!("{}", err); + } + } + } + } + } + } + + let now = Instant::now(); + + // Close connections after some time of inactivity and write pending + // messages (which is required after closing anyway.) + // + // FIXME: peers need to be removed too, wherever they are stored + connections.retain(|_, opt_connection| { + if let Some(connection) = opt_connection { + if connection.valid_until < now { + connection.ws.close(None).unwrap(); + } + + loop { + match connection.ws.write_pending(){ + Err(tungstenite::Error::Io(err)) => { + if err.kind() == ErrorKind::WouldBlock { + break + } + }, + Err(tungstenite::Error::ConnectionClosed) => { + // FIXME: necessary? + poll.registry() + .deregister(connection.ws.get_mut()) + .unwrap(); + + return false; + }, + _ => {} + } + } + } + + true + }); + + // TODO: loop through responses from channel, write them to wss or + // possibly register ws as writable (but this means event capacity + // must be adjusted accordingy and is limiting) + + // How should IP's be handled? Send index and src to processing and + // lookup on return that entry is correct. Old ideas: + // Maybe use IndexMap and use numerical + // index for token? Removing element from IndexMap requires shifting + // or swapping indeces, so not very good. + for _ in 0..100 { + let connection_index = 1; + let peer_socket_addr: SocketAddr = "127.0.0.1:8080".parse().unwrap(); + let message = Message::Text("test".to_string()); + + let opt_connection = connections + .get_mut(connection_index); + + if let Some(Some(connection)) = opt_connection { + if connection.peer_socket_addr != peer_socket_addr { + continue; + } + + match connection.ws.write_message(message){ + Ok(()) => {}, + Err(tungstenite::Error::Io(err)) => { + if err.kind() == ErrorKind::WouldBlock { + continue; + } + + eprint!("{}", err); + }, + Err(tungstenite::Error::ConnectionClosed) => { + // FIXME: necessary? + poll.registry() + .deregister(connection.ws.get_mut()) + .unwrap(); + + connections.remove(connection_index); + }, + Err(err) => { + eprint!("{}", err); + }, + } + } + } + } +} \ No newline at end of file diff --git a/aquatic_ws/src/lib/protocol.rs b/aquatic_ws/src/lib/protocol.rs new file mode 100644 index 0000000..ce7a3ae --- /dev/null +++ b/aquatic_ws/src/lib/protocol.rs @@ -0,0 +1,108 @@ +use std::collections::HashMap; +use std::net::IpAddr; + + +/// TODO: will need to store socket worker index and connection index +/// for middleman activities, also save SocketAddr instead of IP and port, +/// maybe, for comparison in socket worker +pub struct Peer { + pub complete: bool, // bytes_left == 0 + pub peer_id: [u8; 20], + pub ip: IpAddr, // From src socket addr + pub port: u16, // From src port +} + + +pub enum AnnounceEvent { + Started, + Stopped, + Completed, + Update +} + + +/// Apparently, these are sent to a number of peers when they are set +/// in an AnnounceRequest +/// action = "announce" +pub struct MiddlemanOfferToPeer { + pub offer: (), // Gets copied from AnnounceRequestOffer + pub offer_id: (), // Gets copied from AnnounceRequestOffer + pub peer_id: [u8; 20], // Peer id of peer sending offer + pub info_hash: [u8; 20], +} + + +/// If announce request has answer = true, send this to peer with +/// peer id == "to_peer_id" field +/// Action field should be 'announce' +pub struct MiddlemanAnswerToPeer { + pub answer: bool, + pub offer_id: (), + pub peer_id: [u8; 20], + pub info_hash: [u8; 20], +} + + +/// Element of AnnounceRequest.offers +pub struct AnnounceRequestOffer { + pub offer: (), // TODO: Check client for what this is + pub offer_id: (), // TODO: Check client for what this is +} + + +pub struct AnnounceRequest { + pub info_hash: [u8; 20], // FIXME: I think these are actually really just strings with 20 len, same with peer id + pub peer_id: [u8; 20], + pub bytes_left: bool, // Just called "left" in protocol + pub event: AnnounceEvent, // Can be empty? Then, default is "update" + + // Length of this is number of peers wanted? + // Only when this is an array offers are sent + pub offers: Option>, + + /// If false, send response before sending offers (or possibly "skip sending update back"?) + /// If true, send MiddlemanAnswerToPeer to peer with "to_peer_id" as peer_id. + pub answer: bool, + pub to_peer_id: Option<[u8; 20]>, // Only parsed to hex if answer == true, probably undefined otherwise +} + + +pub struct AnnounceResponse { + pub info_hash: [u8; 20], + pub complete: usize, + pub incomplete: usize, + // I suspect receivers don't care about this and rely on offers instead?? + // Also, what does it contain, exacly? + pub peers: Vec<()>, + pub interval: usize, // Default 2 min probably + + // Sent to "to_peer_id" peer (?? or did I put this into MiddlemanAnswerToPeer instead?) + pub offer_id: (), + pub answer: bool, +} + + + +pub struct ScrapeRequest { + // If omitted, scrape for all torrents, apparently + // There is some kind of parsing here too which accepts a single info hash + // and puts it into a vector + pub info_hashes: Option>, +} + + +pub struct ScrapeStatistics { + pub complete: usize, + pub incomplete: usize, + pub downloaded: usize, +} + + +pub struct ScrapeResponse { + pub files: HashMap<[u8; 20], ScrapeStatistics>, // InfoHash to Scrape stats + pub flags: HashMap, +} +//pub struct ScrapeResponse { +// pub complete: usize, +// pub incomplete: usize, +//} \ No newline at end of file