aquatic_ws: add tungstenite::Message encode and decode, not tested

This commit is contained in:
Joakim Frostegård 2020-05-07 19:34:28 +02:00
parent c94b3d7d3c
commit 4d0c3d309a
5 changed files with 114 additions and 25 deletions

View file

@ -17,7 +17,7 @@ path = "src/bin/main.rs"
bittorrent_udp = { path = "../bittorrent_udp" }
cli_helpers = { path = "../cli_helpers" }
flume = "0.7"
hashbrown = "0.7"
hashbrown = { version = "0.7", features = ["serde"] }
histogram = "0.6"
indexmap = "1"
mimalloc = { version = "0.1", default-features = false }
@ -27,6 +27,7 @@ parking_lot = "0.10"
privdrop = "0.3"
rand = { version = "0.7", features = ["small_rng"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
slab = "0.4"
tungstenite = "0.10"

View file

@ -4,7 +4,7 @@ use std::io::ErrorKind;
use std::option::Option;
use slab::Slab;
use tungstenite::{Message, WebSocket};
use tungstenite::WebSocket;
use mio::{Events, Poll, Interest, Token};
use mio::net::{TcpListener, TcpStream};
@ -88,21 +88,16 @@ pub fn run_socket_worker(
loop {
if let Some(Some(connection)) = connections.get_mut(token.0){
match connection.ws.read_message(){
Ok(message) => {
// TODO: convert tungstenite::Message to in_message
Ok(ws_message) => {
if let Some(in_message) = InMessage::from_ws_message(ws_message){
let meta = ConnectionMeta {
socket_worker_index,
socket_worker_slab_index: token.0,
peer_socket_addr: connection.peer_socket_addr
};
let meta = ConnectionMeta {
socket_worker_index,
socket_worker_slab_index: token.0,
peer_socket_addr: connection.peer_socket_addr
};
// Dummy in_message
let in_message = InMessage::ScrapeRequest(ScrapeRequest {
info_hashes: None,
});
in_message_sender.send((meta, in_message));
in_message_sender.send((meta, in_message));
}
connection.valid_until = valid_until;
},
@ -166,11 +161,7 @@ pub fn run_socket_worker(
});
// Read messages from channel, send to peers
// TODO: convert out_message to tungstenite::Message
for (meta, out_message) in out_message_receiver.drain(){
// dummy message
let message = Message::Text("test".to_string());
let opt_connection = connections
.get_mut(meta.socket_worker_slab_index);
@ -181,7 +172,7 @@ pub fn run_socket_worker(
continue;
}
match connection.ws.write_message(message){
match connection.ws.write_message(out_message.to_ws_message()){
Ok(()) => {},
Err(tungstenite::Error::Io(err)) => {
if err.kind() == ErrorKind::WouldBlock {

View file

@ -1,14 +1,18 @@
use std::net::IpAddr;
use hashbrown::HashMap;
use serde::{Serialize, Deserialize};
#[derive(Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
pub struct PeerId(pub [u8; 20]);
#[derive(Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
pub struct InfoHash(pub [u8; 20]);
#[derive(Clone, Serialize)]
pub struct ResponsePeer {
pub peer_id: PeerId,
pub ip: IpAddr, // From src socket addr
@ -17,6 +21,7 @@ pub struct ResponsePeer {
}
#[derive(Clone, Deserialize)]
pub enum AnnounceEvent {
Started,
Stopped,
@ -35,6 +40,7 @@ impl Default for AnnounceEvent {
/// Apparently, these are sent to a number of peers when they are set
/// in an AnnounceRequest
/// action = "announce"
#[derive(Clone, Serialize)]
pub struct MiddlemanOfferToPeer {
pub peer_id: PeerId, // Peer id of peer sending offer
pub info_hash: InfoHash,
@ -46,6 +52,7 @@ pub struct MiddlemanOfferToPeer {
/// If announce request has answer = true, send this to peer with
/// peer id == "to_peer_id" field
/// Action field should be 'announce'
#[derive(Clone, Serialize)]
pub struct MiddlemanAnswerToPeer {
pub peer_id: PeerId,
pub info_hash: InfoHash,
@ -55,12 +62,14 @@ pub struct MiddlemanAnswerToPeer {
/// Element of AnnounceRequest.offers
#[derive(Clone, Deserialize)]
pub struct AnnounceRequestOffer {
pub offer: (), // TODO: Check client for what this is
pub offer_id: (), // TODO: Check client for what this is
}
#[derive(Clone, Deserialize)]
pub struct AnnounceRequest {
pub info_hash: InfoHash, // FIXME: I think these are actually really just strings with 20 len, same with peer id
pub peer_id: PeerId,
@ -78,6 +87,7 @@ pub struct AnnounceRequest {
}
#[derive(Clone, Serialize)]
pub struct AnnounceResponse {
pub info_hash: InfoHash,
pub complete: usize,
@ -93,6 +103,7 @@ pub struct AnnounceResponse {
}
#[derive(Clone, Deserialize)]
pub struct ScrapeRequest {
// If omitted, scrape for all torrents, apparently
// There is some kind of parsing here too which accepts a single info hash
@ -101,6 +112,7 @@ pub struct ScrapeRequest {
}
#[derive(Clone, Serialize)]
pub struct ScrapeStatistics {
pub complete: usize,
pub incomplete: usize,
@ -108,12 +120,44 @@ pub struct ScrapeStatistics {
}
#[derive(Clone, Serialize)]
pub struct ScrapeResponse {
pub files: HashMap<InfoHash, ScrapeStatistics>, // InfoHash to Scrape stats
pub flags: HashMap<String, usize>,
}
#[derive(Clone, Serialize, Deserialize)]
pub enum Action {
Announce,
Scrape
}
#[derive(Clone, Serialize, Deserialize)]
struct ActionWrapper<T> {
pub action: Action,
#[serde(flatten)]
pub inner: T,
}
impl <T>ActionWrapper<T> {
pub fn announce(t: T) -> Self {
Self {
action: Action::Announce,
inner: t
}
}
pub fn scrape(t: T) -> Self {
Self {
action: Action::Scrape,
inner: t
}
}
}
pub enum InMessage {
AnnounceRequest(AnnounceRequest),
ScrapeRequest(ScrapeRequest),
@ -121,8 +165,34 @@ pub enum InMessage {
impl InMessage {
fn from_ws_message(ws_messge: tungstenite::Message) -> Result<Self, ()> {
unimplemented!()
/// Try parsing as announce request first. If that fails, try parsing as
/// scrape request, or return None
pub fn from_ws_message(ws_message: tungstenite::Message) -> Option<Self> {
use tungstenite::Message::{Text, Binary};
let text = match ws_message {
Text(text) => Some(text),
Binary(bytes) => String::from_utf8(bytes).ok(),
_ => None
}?;
let res: Result<ActionWrapper<AnnounceRequest>, _> = serde_json::from_str(&text);
if let Ok(wrapper) = res {
if let Action::Announce = wrapper.action {
return Some(InMessage::AnnounceRequest(wrapper.inner));
}
}
let res: Result<ActionWrapper<ScrapeRequest>, _> = serde_json::from_str(&text);
if let Ok(wrapper) = res {
if let Action::Scrape = wrapper.action {
return Some(InMessage::ScrapeRequest(wrapper.inner));
}
}
None
}
}
@ -136,7 +206,30 @@ pub enum OutMessage {
impl OutMessage {
fn to_ws_message(self) -> tungstenite::Message {
unimplemented!()
pub fn to_ws_message(self) -> tungstenite::Message {
let json = match self {
Self::AnnounceResponse(message) => {
serde_json::to_string(
&ActionWrapper::announce(message)
).unwrap()
},
Self::Offer(message) => {
serde_json::to_string(
&ActionWrapper::announce(message)
).unwrap()
},
Self::Answer(message) => {
serde_json::to_string(
&ActionWrapper::announce(message)
).unwrap()
},
Self::ScrapeResponse(message) => {
serde_json::to_string(
&ActionWrapper::scrape(message)
).unwrap()
},
};
tungstenite::Message::from(json)
}
}