From 75c8ccd523863191f92cb02f3ba4409544faef79 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 10 May 2020 15:25:22 +0200 Subject: [PATCH] WIP: try to get mio + tungstenite working --- aquatic_ws/src/lib/lib.rs | 2 +- aquatic_ws/src/lib/network.rs | 135 +++++++++++++++++++++++++++------- 2 files changed, 108 insertions(+), 29 deletions(-) diff --git a/aquatic_ws/src/lib/lib.rs b/aquatic_ws/src/lib/lib.rs index 713ae9b..a7822ee 100644 --- a/aquatic_ws/src/lib/lib.rs +++ b/aquatic_ws/src/lib/lib.rs @@ -18,7 +18,7 @@ pub fn run(){ let mut out_message_senders = Vec::new(); - for i in 0..2 { + for i in 0..1 { let in_message_sender = in_message_sender.clone(); let (out_message_sender, out_message_receiver) = ::flume::unbounded(); diff --git a/aquatic_ws/src/lib/network.rs b/aquatic_ws/src/lib/network.rs index e079b3a..620f21c 100644 --- a/aquatic_ws/src/lib/network.rs +++ b/aquatic_ws/src/lib/network.rs @@ -5,6 +5,8 @@ use std::option::Option; use slab::Slab; use tungstenite::WebSocket; +use tungstenite::handshake::{MidHandshake, HandshakeError, server::{ServerHandshake, NoCallback}}; +use indexmap::IndexMap; use mio::{Events, Poll, Interest, Token}; use mio::net::{TcpListener, TcpStream}; @@ -13,6 +15,14 @@ use crate::common::*; use crate::protocol::*; +pub enum Connection { + Stream(TcpStream), + MidHandshake(MidHandshake>), + Established(PeerConnection), + Placeholder +} + + pub struct PeerConnection { pub ws: WebSocket, pub peer_socket_addr: SocketAddr, @@ -37,10 +47,10 @@ pub fn run_socket_worker( let timeout = Duration::from_millis(50); // FIXME: config - let mut connections: Slab> = Slab::new(); + let mut connections: IndexMap = IndexMap::new(); // Insert empty first entry to prevent assignment of index 0 - assert_eq!(connections.insert(None), 0); + assert_eq!(connections.insert_full(0, Connection::Placeholder).0, 0); loop { poll.poll(&mut events, Some(timeout)) @@ -52,7 +62,7 @@ pub fn run_socket_worker( let token = event.token(); if token.0 == 0 { - accept_new_connections( + accept_new_streams( &mut listener, &mut poll, &mut connections, @@ -76,7 +86,7 @@ pub fn run_socket_worker( // messages (which is required after closing anyway.) // // FIXME: peers need to be removed too, wherever they are stored - connections.retain(|_, opt_connection| { +/* connections.retain(|_, opt_connection| { if let Some(connection) = opt_connection { if connection.valid_until.0 < now { connection.ws.close(None).unwrap(); @@ -103,7 +113,7 @@ pub fn run_socket_worker( } true - }); + }); */ send_out_messages( out_message_receiver.drain(), @@ -114,34 +124,22 @@ pub fn run_socket_worker( } -fn accept_new_connections( +fn accept_new_streams( listener: &mut TcpListener, poll: &mut Poll, - connections: &mut Slab>, + connections: &mut IndexMap, valid_until: ValidUntil, ){ loop { match listener.accept(){ Ok((mut stream, src)) => { - let entry = connections.vacant_entry(); - let token = Token(entry.key()); + let token = Token(connections.len()); poll.registry() .register(&mut stream, token, Interest::READABLE) .unwrap(); - - // FIXME: will this cause issues due to blocking? - // Should handshake be started manually below - // instead? - let ws = tungstenite::server::accept(stream).unwrap(); - let peer_connection = PeerConnection { - ws, - peer_socket_addr: src, - valid_until, - }; - - entry.insert(Some(peer_connection)); + connections.insert(token.0, Connection::Stream(stream)); }, Err(err) => { if err.kind() == ErrorKind::WouldBlock { @@ -154,17 +152,98 @@ fn accept_new_connections( } } +#[derive(Clone, Copy, Debug)] +pub struct DebugCallback; + +impl ::tungstenite::handshake::server::Callback for DebugCallback { + fn on_request( + self, + request: &::tungstenite::handshake::server::Request, + response: ::tungstenite::handshake::server::Response, + ) -> Result<::tungstenite::handshake::server::Response, ::tungstenite::handshake::server::ErrorResponse> { + println!("request: {:#?}", request); + + Ok(response) + } +} + pub fn read_and_forward_in_messages( socket_worker_index: usize, in_message_sender: &InMessageSender, poll: &mut Poll, - connections: &mut Slab>, + connections: &mut IndexMap, poll_token: Token, valid_until: ValidUntil, ){ + println!("poll_token: {}", poll_token.0); + loop { - if let Some(Some(connection)) = connections.get_mut(poll_token.0){ + let established = match connections.get_index(poll_token.0){ + Some((_, Connection::Stream(_))) => false, + Some((_, Connection::MidHandshake(_))) => false, + Some((_, Connection::Established(_))) => true, + Some((_, Connection::Placeholder)) => unreachable!(), + None => break, + }; + + if !established { + let conn = connections.remove(&poll_token.0).unwrap(); + + match conn { + Connection::Stream(stream) => { + let peer_socket_addr = stream.peer_addr().unwrap(); + + match ::tungstenite::server::accept_hdr(stream, DebugCallback){ + Ok(ws) => { + println!("handshake established"); + let peer_connection = PeerConnection { + ws, + peer_socket_addr, + valid_until, + }; + + connections.insert(poll_token.0, Connection::Established(peer_connection)); + }, + Err(HandshakeError::Interrupted(handshake)) => { + println!("interrupted"); + connections.insert(poll_token.0, Connection::MidHandshake(handshake)); + + break; + }, + Err(HandshakeError::Failure(err)) => { + eprintln!("handshake: {}", err) + } + } + }, + Connection::MidHandshake(mut handshake) => { + let stream = handshake.get_mut().get_mut(); + let peer_socket_addr = stream.peer_addr().unwrap(); + + match handshake.handshake(){ + Ok(ws) => { + println!("handshake established"); + let peer_connection = PeerConnection { + ws, + peer_socket_addr, + valid_until, + }; + + connections.insert(poll_token.0, Connection::Established(peer_connection)); + }, + Err(HandshakeError::Interrupted(handshake)) => { + connections.insert(poll_token.0, Connection::MidHandshake(handshake)); + + break; + }, + Err(err) => eprintln!("handshake: {}", err), + } + }, + _ => unreachable!(), + } + } else if let Some(Connection::Established(connection)) = connections.get_mut(&poll_token.0){ + println!("conn established"); + match connection.ws.read_message(){ Ok(ws_message) => { if let Some(in_message) = InMessage::from_ws_message(ws_message){ @@ -192,7 +271,7 @@ pub fn read_and_forward_in_messages( .deregister(connection.ws.get_mut()) .unwrap(); - connections.remove(poll_token.0); + connections.remove(&poll_token.0); }, Err(err) => { eprint!("{}", err); @@ -206,14 +285,14 @@ pub fn read_and_forward_in_messages( pub fn send_out_messages( out_message_receiver: ::flume::Drain<(ConnectionMeta, OutMessage)>, poll: &mut Poll, - connections: &mut Slab>, + connections: &mut IndexMap, ){ // Read messages from channel, send to peers for (meta, out_message) in out_message_receiver { let opt_connection = connections - .get_mut(meta.socket_worker_slab_index); + .get_mut(&meta.socket_worker_slab_index); - if let Some(Some(connection)) = opt_connection { + if let Some(Connection::Established(connection)) = opt_connection { if connection.peer_socket_addr != meta.peer_socket_addr { eprintln!("socket worker: peer socket addrs didn't match"); @@ -235,7 +314,7 @@ pub fn send_out_messages( .deregister(connection.ws.get_mut()) .unwrap(); - connections.remove(meta.socket_worker_slab_index); + connections.remove(&meta.socket_worker_slab_index); }, Err(err) => { eprint!("{}", err);