diff --git a/Cargo.lock b/Cargo.lock index 2db7cd9..4d0b960 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -97,6 +97,7 @@ dependencies = [ "quickcheck_macros", "rand", "serde", + "serde_json", "slab", "tungstenite", ] @@ -483,6 +484,7 @@ checksum = "96282e96bfcd3da0d3aa9938bedf1e50df3269b6db08b4876d2da0bb1a0841cf" dependencies = [ "ahash", "autocfg", + "serde", ] [[package]] diff --git a/TODO.md b/TODO.md index 123e87e..7f2bb09 100644 --- a/TODO.md +++ b/TODO.md @@ -2,6 +2,8 @@ ## aquatic_ws * serde + * AnnonunceWrapper or ActionWrapper or the like with an action field + and serde flatten on inner message * handler ## aquatic diff --git a/aquatic_ws/Cargo.toml b/aquatic_ws/Cargo.toml index c71b4ae..d5a165d 100644 --- a/aquatic_ws/Cargo.toml +++ b/aquatic_ws/Cargo.toml @@ -17,7 +17,7 @@ path = "src/bin/main.rs" bittorrent_udp = { path = "../bittorrent_udp" } cli_helpers = { path = "../cli_helpers" } flume = "0.7" -hashbrown = "0.7" +hashbrown = { version = "0.7", features = ["serde"] } histogram = "0.6" indexmap = "1" mimalloc = { version = "0.1", default-features = false } @@ -27,6 +27,7 @@ parking_lot = "0.10" privdrop = "0.3" rand = { version = "0.7", features = ["small_rng"] } serde = { version = "1", features = ["derive"] } +serde_json = "1" slab = "0.4" tungstenite = "0.10" diff --git a/aquatic_ws/src/lib/network.rs b/aquatic_ws/src/lib/network.rs index cf39ca2..322bcb2 100644 --- a/aquatic_ws/src/lib/network.rs +++ b/aquatic_ws/src/lib/network.rs @@ -4,7 +4,7 @@ use std::io::ErrorKind; use std::option::Option; use slab::Slab; -use tungstenite::{Message, WebSocket}; +use tungstenite::WebSocket; use mio::{Events, Poll, Interest, Token}; use mio::net::{TcpListener, TcpStream}; @@ -88,21 +88,16 @@ pub fn run_socket_worker( loop { if let Some(Some(connection)) = connections.get_mut(token.0){ match connection.ws.read_message(){ - Ok(message) => { - // TODO: convert tungstenite::Message to in_message + Ok(ws_message) => { + if let Some(in_message) = InMessage::from_ws_message(ws_message){ + let meta = ConnectionMeta { + socket_worker_index, + socket_worker_slab_index: token.0, + peer_socket_addr: connection.peer_socket_addr + }; - let meta = ConnectionMeta { - socket_worker_index, - socket_worker_slab_index: token.0, - peer_socket_addr: connection.peer_socket_addr - }; - - // Dummy in_message - let in_message = InMessage::ScrapeRequest(ScrapeRequest { - info_hashes: None, - }); - - in_message_sender.send((meta, in_message)); + in_message_sender.send((meta, in_message)); + } connection.valid_until = valid_until; }, @@ -166,11 +161,7 @@ pub fn run_socket_worker( }); // Read messages from channel, send to peers - // TODO: convert out_message to tungstenite::Message for (meta, out_message) in out_message_receiver.drain(){ - // dummy message - let message = Message::Text("test".to_string()); - let opt_connection = connections .get_mut(meta.socket_worker_slab_index); @@ -181,7 +172,7 @@ pub fn run_socket_worker( continue; } - match connection.ws.write_message(message){ + match connection.ws.write_message(out_message.to_ws_message()){ Ok(()) => {}, Err(tungstenite::Error::Io(err)) => { if err.kind() == ErrorKind::WouldBlock { diff --git a/aquatic_ws/src/lib/protocol.rs b/aquatic_ws/src/lib/protocol.rs index 0248198..3620262 100644 --- a/aquatic_ws/src/lib/protocol.rs +++ b/aquatic_ws/src/lib/protocol.rs @@ -1,14 +1,18 @@ use std::net::IpAddr; use hashbrown::HashMap; +use serde::{Serialize, Deserialize}; +#[derive(Clone, Hash, PartialEq, Eq, Serialize, Deserialize)] pub struct PeerId(pub [u8; 20]); +#[derive(Clone, Hash, PartialEq, Eq, Serialize, Deserialize)] pub struct InfoHash(pub [u8; 20]); +#[derive(Clone, Serialize)] pub struct ResponsePeer { pub peer_id: PeerId, pub ip: IpAddr, // From src socket addr @@ -17,6 +21,7 @@ pub struct ResponsePeer { } +#[derive(Clone, Deserialize)] pub enum AnnounceEvent { Started, Stopped, @@ -35,6 +40,7 @@ impl Default for AnnounceEvent { /// Apparently, these are sent to a number of peers when they are set /// in an AnnounceRequest /// action = "announce" +#[derive(Clone, Serialize)] pub struct MiddlemanOfferToPeer { pub peer_id: PeerId, // Peer id of peer sending offer pub info_hash: InfoHash, @@ -46,6 +52,7 @@ pub struct MiddlemanOfferToPeer { /// If announce request has answer = true, send this to peer with /// peer id == "to_peer_id" field /// Action field should be 'announce' +#[derive(Clone, Serialize)] pub struct MiddlemanAnswerToPeer { pub peer_id: PeerId, pub info_hash: InfoHash, @@ -55,12 +62,14 @@ pub struct MiddlemanAnswerToPeer { /// Element of AnnounceRequest.offers +#[derive(Clone, Deserialize)] pub struct AnnounceRequestOffer { pub offer: (), // TODO: Check client for what this is pub offer_id: (), // TODO: Check client for what this is } +#[derive(Clone, Deserialize)] pub struct AnnounceRequest { pub info_hash: InfoHash, // FIXME: I think these are actually really just strings with 20 len, same with peer id pub peer_id: PeerId, @@ -78,6 +87,7 @@ pub struct AnnounceRequest { } +#[derive(Clone, Serialize)] pub struct AnnounceResponse { pub info_hash: InfoHash, pub complete: usize, @@ -93,6 +103,7 @@ pub struct AnnounceResponse { } +#[derive(Clone, Deserialize)] pub struct ScrapeRequest { // If omitted, scrape for all torrents, apparently // There is some kind of parsing here too which accepts a single info hash @@ -101,6 +112,7 @@ pub struct ScrapeRequest { } +#[derive(Clone, Serialize)] pub struct ScrapeStatistics { pub complete: usize, pub incomplete: usize, @@ -108,12 +120,44 @@ pub struct ScrapeStatistics { } +#[derive(Clone, Serialize)] pub struct ScrapeResponse { pub files: HashMap, // InfoHash to Scrape stats pub flags: HashMap, } +#[derive(Clone, Serialize, Deserialize)] +pub enum Action { + Announce, + Scrape +} + + +#[derive(Clone, Serialize, Deserialize)] +struct ActionWrapper { + pub action: Action, + #[serde(flatten)] + pub inner: T, +} + + +impl ActionWrapper { + pub fn announce(t: T) -> Self { + Self { + action: Action::Announce, + inner: t + } + } + pub fn scrape(t: T) -> Self { + Self { + action: Action::Scrape, + inner: t + } + } +} + + pub enum InMessage { AnnounceRequest(AnnounceRequest), ScrapeRequest(ScrapeRequest), @@ -121,8 +165,34 @@ pub enum InMessage { impl InMessage { - fn from_ws_message(ws_messge: tungstenite::Message) -> Result { - unimplemented!() + /// Try parsing as announce request first. If that fails, try parsing as + /// scrape request, or return None + pub fn from_ws_message(ws_message: tungstenite::Message) -> Option { + use tungstenite::Message::{Text, Binary}; + + let text = match ws_message { + Text(text) => Some(text), + Binary(bytes) => String::from_utf8(bytes).ok(), + _ => None + }?; + + let res: Result, _> = serde_json::from_str(&text); + + if let Ok(wrapper) = res { + if let Action::Announce = wrapper.action { + return Some(InMessage::AnnounceRequest(wrapper.inner)); + } + } + + let res: Result, _> = serde_json::from_str(&text); + + if let Ok(wrapper) = res { + if let Action::Scrape = wrapper.action { + return Some(InMessage::ScrapeRequest(wrapper.inner)); + } + } + + None } } @@ -136,7 +206,30 @@ pub enum OutMessage { impl OutMessage { - fn to_ws_message(self) -> tungstenite::Message { - unimplemented!() + pub fn to_ws_message(self) -> tungstenite::Message { + let json = match self { + Self::AnnounceResponse(message) => { + serde_json::to_string( + &ActionWrapper::announce(message) + ).unwrap() + }, + Self::Offer(message) => { + serde_json::to_string( + &ActionWrapper::announce(message) + ).unwrap() + }, + Self::Answer(message) => { + serde_json::to_string( + &ActionWrapper::announce(message) + ).unwrap() + }, + Self::ScrapeResponse(message) => { + serde_json::to_string( + &ActionWrapper::scrape(message) + ).unwrap() + }, + }; + + tungstenite::Message::from(json) } } \ No newline at end of file