diff --git a/aquatic_ws/src/lib/network/common.rs b/aquatic_ws/src/lib/network/common.rs index f950376..4bc6318 100644 --- a/aquatic_ws/src/lib/network/common.rs +++ b/aquatic_ws/src/lib/network/common.rs @@ -4,7 +4,7 @@ use std::io::{Read, Write}; use hashbrown::HashMap; use mio::Token; use mio::net::TcpStream; -use native_tls::TlsStream; +use native_tls::{TlsStream, MidHandshakeTlsStream}; use tungstenite::WebSocket; use tungstenite::handshake::{MidHandshake, server::ServerHandshake}; @@ -78,9 +78,10 @@ pub struct EstablishedWs { pub enum ConnectionStage { - Stream(Stream), - TlsMidHandshake(native_tls::MidHandshakeTlsStream), - WsHandshake(MidHandshake>), + TcpStream(TcpStream), + TlsStream(TlsStream), + TlsMidHandshake(MidHandshakeTlsStream), + WsMidHandshake(MidHandshake>), EstablishedWs(EstablishedWs), } diff --git a/aquatic_ws/src/lib/network/mod.rs b/aquatic_ws/src/lib/network/mod.rs index e1a28bc..ab21a35 100644 --- a/aquatic_ws/src/lib/network/mod.rs +++ b/aquatic_ws/src/lib/network/mod.rs @@ -4,10 +4,10 @@ use std::io::ErrorKind; use tungstenite::WebSocket; use tungstenite::handshake::{HandshakeError, server::ServerHandshake}; use hashbrown::HashMap; -use native_tls::{TlsAcceptor, TlsStream}; +use native_tls::TlsAcceptor; use mio::{Events, Poll, Interest, Token}; -use mio::net::{TcpListener, TcpStream}; +use mio::net::TcpListener; use crate::common::*; use crate::config::Config; @@ -119,12 +119,10 @@ fn accept_new_streams( poll.registry() .register(&mut stream, token, Interest::READABLE) .unwrap(); - - let stream = Stream::TcpStream(stream); let connection = Connection { valid_until, - stage: ConnectionStage::Stream(stream) + stage: ConnectionStage::TcpStream(stream) }; connections.insert(token, connection); @@ -172,7 +170,7 @@ pub fn handle_ws_handshake_result( let connection = Connection { valid_until, - stage: ConnectionStage::WsHandshake(handshake), + stage: ConnectionStage::WsMidHandshake(handshake), }; connections.insert(poll_token, connection); @@ -188,64 +186,6 @@ pub fn handle_ws_handshake_result( } -// Macro hack to not have to write the following twice in -// `run_handshakes_and_read_messages` (putting it in a function causes error -// because of multiple mutable references) -macro_rules! read_ws_messages { - ( - $socket_worker_index: ident, - $in_message_sender: ident, - $poll: ident, - $connections: ident, - $poll_token: ident, - $established_ws: ident - ) => { - println!("conn established"); - - match $established_ws.ws.read_message(){ - Ok(ws_message) => { - dbg!(ws_message.clone()); - - if let Some(in_message) = InMessage::from_ws_message(ws_message){ - dbg!(in_message.clone()); - - let meta = ConnectionMeta { - worker_index: $socket_worker_index, - poll_token: $poll_token, - peer_addr: $established_ws.peer_addr - }; - - $in_message_sender.send((meta, in_message)); - } - }, - Err(tungstenite::Error::Io(err)) => { - if err.kind() == ErrorKind::WouldBlock { - break; - } - - remove_connection_if_exists($poll, $connections, $poll_token); - - eprint!("{}", err); - - break; - }, - Err(tungstenite::Error::ConnectionClosed) => { - remove_connection_if_exists($poll, $connections, $poll_token); - - break; - }, - Err(err) => { - dbg!(err); - - remove_connection_if_exists($poll, $connections, $poll_token); - - break; - } - } - }; -} - - /// Get TLS (if requested) and tungstenite up and running, then read messages pub fn run_handshakes_and_read_messages( socket_worker_index: usize, @@ -259,16 +199,18 @@ pub fn run_handshakes_and_read_messages( println!("poll_token: {}", poll_token.0); loop { - let established = match connections.get(&poll_token).map(|c| &c.stage){ - Some(stage) => stage.is_established(), - None => break, + let established = if let Some(c) = connections.get(&poll_token){ + c.stage.is_established() + } else { + // Connection is not present, so it is closed. Stop processing + break; }; if !established { let conn = connections.remove(&poll_token).unwrap(); match conn.stage { - ConnectionStage::Stream(Stream::TcpStream(stream)) => { + ConnectionStage::TcpStream(stream) => { if let Some(tls_acceptor) = opt_tls_acceptor { match tls_acceptor.accept(stream){ Ok(stream) => { @@ -276,7 +218,7 @@ pub fn run_handshakes_and_read_messages( let connection = Connection { valid_until, - stage: ConnectionStage::Stream(Stream::TlsStream(stream)) + stage: ConnectionStage::TlsStream(stream) }; connections.insert(poll_token, connection); @@ -315,7 +257,7 @@ pub fn run_handshakes_and_read_messages( } } }, - ConnectionStage::Stream(Stream::TlsStream(stream)) => { + ConnectionStage::TlsStream(stream) => { let handshake_result = ::tungstenite::server::accept_hdr( Stream::TlsStream(stream), DebugCallback @@ -339,7 +281,7 @@ pub fn run_handshakes_and_read_messages( let connection = Connection { valid_until, - stage: ConnectionStage::Stream(Stream::TlsStream(stream)) + stage: ConnectionStage::TlsStream(stream) }; connections.insert(poll_token, connection); @@ -361,7 +303,7 @@ pub fn run_handshakes_and_read_messages( } } }, - ConnectionStage::WsHandshake(handshake) => { + ConnectionStage::WsMidHandshake(handshake) => { let stop_loop = handle_ws_handshake_result( connections, poll_token, @@ -375,22 +317,49 @@ pub fn run_handshakes_and_read_messages( }, ConnectionStage::EstablishedWs(_) => unreachable!(), } - } else { - match connections.get_mut(&poll_token){ - Some(Connection{ - stage: ConnectionStage::EstablishedWs(established_ws), - .. - }) => { - read_ws_messages!( - socket_worker_index, - in_message_sender, - poll, - connections, - poll_token, - established_ws - ); + } else if let Some(Connection { + stage: ConnectionStage::EstablishedWs(established_ws), + .. + }) = connections.get_mut(&poll_token){ + match established_ws.ws.read_message(){ + Ok(ws_message) => { + dbg!(ws_message.clone()); + + if let Some(in_message) = InMessage::from_ws_message(ws_message){ + dbg!(in_message.clone()); + + let meta = ConnectionMeta { + worker_index: socket_worker_index, + poll_token: poll_token, + peer_addr: established_ws.peer_addr + }; + + in_message_sender.send((meta, in_message)); + } }, - _ => () + Err(tungstenite::Error::Io(err)) => { + if err.kind() == ErrorKind::WouldBlock { + break; + } + + remove_connection_if_exists(poll, connections, poll_token); + + eprint!("{}", err); + + break; + }, + Err(tungstenite::Error::ConnectionClosed) => { + remove_connection_if_exists(poll, connections, poll_token); + + break; + }, + Err(err) => { + dbg!(err); + + remove_connection_if_exists(poll, connections, poll_token); + + break; + } } } } diff --git a/aquatic_ws/src/lib/network/utils.rs b/aquatic_ws/src/lib/network/utils.rs index ba8feda..05ba2df 100644 --- a/aquatic_ws/src/lib/network/utils.rs +++ b/aquatic_ws/src/lib/network/utils.rs @@ -63,13 +63,16 @@ pub fn close_and_deregister_connection( connection: &mut Connection, ){ match connection.stage { - ConnectionStage::Stream(ref mut stream) => { + ConnectionStage::TcpStream(ref mut stream) => { /* poll.registry() .deregister(stream) .unwrap(); */ }, + ConnectionStage::TlsStream(ref mut stream) => { + + } ConnectionStage::TlsMidHandshake(ref mut handshake) => { /* poll.registry() @@ -77,7 +80,7 @@ pub fn close_and_deregister_connection( .unwrap(); */ }, - ConnectionStage::WsHandshake(ref mut handshake) => { + ConnectionStage::WsMidHandshake(ref mut handshake) => { /* poll.registry() .deregister(handshake.get_mut().get_mut())