From 6e0b0ccbc279f93a4b503fe332279253a9073465 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 12 May 2020 20:31:07 +0200 Subject: [PATCH] WIP: aquatic_ws: network: rename PeerConnection to EstablishedWs --- aquatic_ws/src/lib/lib.rs | 2 +- aquatic_ws/src/lib/network.rs | 80 ++++++++++++++++------------------- 2 files changed, 38 insertions(+), 44 deletions(-) diff --git a/aquatic_ws/src/lib/lib.rs b/aquatic_ws/src/lib/lib.rs index 844b45c..515e79b 100644 --- a/aquatic_ws/src/lib/lib.rs +++ b/aquatic_ws/src/lib/lib.rs @@ -35,7 +35,7 @@ pub fn run(config: Config){ i, in_message_sender, out_message_receiver, - true + false ); }); } diff --git a/aquatic_ws/src/lib/network.rs b/aquatic_ws/src/lib/network.rs index 6ca011a..fde3eaa 100644 --- a/aquatic_ws/src/lib/network.rs +++ b/aquatic_ws/src/lib/network.rs @@ -21,10 +21,9 @@ use crate::protocol::*; pub type Stream = TlsStream; -pub struct PeerConnection { +pub struct EstablishedWs { pub ws: WebSocket, pub peer_addr: SocketAddr, - pub valid_until: ValidUntil, } @@ -34,8 +33,8 @@ pub enum ConnectionStage { TlsStream(Stream), WsHandshakeNoTls(MidHandshake>), WsHandshakeTls(MidHandshake>), - EstablishedNoTls(PeerConnection), - EstablishedTls(PeerConnection), + EstablishedWsNoTls(EstablishedWs), + EstablishedWsTls(EstablishedWs), } @@ -95,32 +94,32 @@ fn close_and_deregister_connection( .deregister(handshake.get_mut().get_mut().get_mut()) .unwrap(); }, - ConnectionStage::EstablishedNoTls(ref mut peer_connection) => { - if peer_connection.ws.can_read(){ - peer_connection.ws.close(None).unwrap(); + ConnectionStage::EstablishedWsNoTls(ref mut established_ws) => { + if established_ws.ws.can_read(){ + established_ws.ws.close(None).unwrap(); // Needs to be done after ws.close() - if let Err(err) = peer_connection.ws.write_pending(){ + if let Err(err) = established_ws.ws.write_pending(){ dbg!(err); } } poll.registry() - .deregister(peer_connection.ws.get_mut()) + .deregister(established_ws.ws.get_mut()) .unwrap(); }, - ConnectionStage::EstablishedTls(ref mut peer_connection) => { - if peer_connection.ws.can_read(){ - peer_connection.ws.close(None).unwrap(); + ConnectionStage::EstablishedWsTls(ref mut established_ws) => { + if established_ws.ws.can_read(){ + established_ws.ws.close(None).unwrap(); // Needs to be done after ws.close() - if let Err(err) = peer_connection.ws.write_pending(){ + if let Err(err) = established_ws.ws.write_pending(){ dbg!(err); } } poll.registry() - .deregister(peer_connection.ws.get_mut().get_mut()) + .deregister(established_ws.ws.get_mut().get_mut()) .unwrap(); }, } @@ -230,7 +229,11 @@ pub fn run_socket_worker( .register(&mut listener, Token(0), Interest::READABLE) .unwrap(); - let tls_acceptor = create_tls_acceptor(&config); + let opt_tls_acceptor = if use_tls { + Some(create_tls_acceptor(&config)) + } else { + None + }; let mut connections: ConnectionMap = HashMap::new(); @@ -258,12 +261,11 @@ pub fn run_socket_worker( run_handshakes_and_read_messages( socket_worker_index, &in_message_sender, - &tls_acceptor, + &opt_tls_acceptor, &mut poll, &mut connections, token, valid_until, - use_tls ); } } @@ -380,15 +382,14 @@ pub fn handle_ws_handshake_no_tls_result( let peer_addr = ws.get_mut().peer_addr().unwrap(); - let peer_connection = PeerConnection { + let established_ws = EstablishedWs { ws, peer_addr, - valid_until, }; let connection = Connection { valid_until, - stage: ConnectionStage::EstablishedNoTls(peer_connection) + stage: ConnectionStage::EstablishedWsNoTls(established_ws) }; connections.insert(poll_token, connection); @@ -428,15 +429,14 @@ pub fn handle_ws_handshake_tls_result( let peer_addr = ws.get_mut().get_mut().peer_addr().unwrap(); - let peer_connection = PeerConnection { + let established_ws = EstablishedWs { ws, peer_addr, - valid_until, }; let connection = Connection { valid_until, - stage: ConnectionStage::EstablishedTls(peer_connection) + stage: ConnectionStage::EstablishedWsTls(established_ws) }; connections.insert(poll_token, connection); @@ -473,12 +473,11 @@ macro_rules! read_ws_messages { $poll: ident, $connections: ident, $poll_token: ident, - $valid_until: ident, - $peer_connection: ident + $established_ws: ident ) => { println!("conn established"); - match $peer_connection.ws.read_message(){ + match $established_ws.ws.read_message(){ Ok(ws_message) => { dbg!(ws_message.clone()); @@ -488,13 +487,11 @@ macro_rules! read_ws_messages { let meta = ConnectionMeta { worker_index: $socket_worker_index, poll_token: $poll_token, - peer_addr: $peer_connection.peer_addr + peer_addr: $established_ws.peer_addr }; $in_message_sender.send((meta, in_message)); } - - $peer_connection.valid_until = $valid_until; }, Err(tungstenite::Error::Io(err)) => { if err.kind() == ErrorKind::WouldBlock { @@ -527,19 +524,18 @@ macro_rules! read_ws_messages { pub fn run_handshakes_and_read_messages( socket_worker_index: usize, in_message_sender: &InMessageSender, - tls_acceptor: &TlsAcceptor, + opt_tls_acceptor: &Option, // If set, run TLS poll: &mut Poll, connections: &mut ConnectionMap, poll_token: Token, valid_until: ValidUntil, - use_tls: bool ){ println!("poll_token: {}", poll_token.0); loop { let established = match connections.get(&poll_token).map(|c| &c.stage){ - Some(ConnectionStage::EstablishedTls(_)) => true, - Some(ConnectionStage::EstablishedNoTls(_)) => true, + Some(ConnectionStage::EstablishedWsTls(_)) => true, + Some(ConnectionStage::EstablishedWsNoTls(_)) => true, Some(_) => false, None => break, }; @@ -549,7 +545,7 @@ pub fn run_handshakes_and_read_messages( match conn.stage { ConnectionStage::TcpStream(stream) => { - if use_tls { + if let Some(tls_acceptor) = opt_tls_acceptor { let stop_loop = handle_tls_handshake_result( connections, poll_token, @@ -631,13 +627,13 @@ pub fn run_handshakes_and_read_messages( break; } }, - ConnectionStage::EstablishedNoTls(_) => unreachable!(), - ConnectionStage::EstablishedTls(_) => unreachable!(), + ConnectionStage::EstablishedWsNoTls(_) => unreachable!(), + ConnectionStage::EstablishedWsTls(_) => unreachable!(), } } else { match connections.get_mut(&poll_token){ Some(Connection{ - stage: ConnectionStage::EstablishedNoTls(peer_connection), + stage: ConnectionStage::EstablishedWsNoTls(established_ws), .. }) => { read_ws_messages!( @@ -646,12 +642,11 @@ pub fn run_handshakes_and_read_messages( poll, connections, poll_token, - valid_until, - peer_connection + established_ws ); }, Some(Connection{ - stage: ConnectionStage::EstablishedTls(peer_connection), + stage: ConnectionStage::EstablishedWsTls(established_ws), .. }) => { read_ws_messages!( @@ -660,8 +655,7 @@ pub fn run_handshakes_and_read_messages( poll, connections, poll_token, - valid_until, - peer_connection + established_ws ); }, _ => () @@ -682,7 +676,7 @@ pub fn send_out_messages( .get_mut(&meta.poll_token) .map(|v| &mut v.stage); - if let Some(ConnectionStage::EstablishedTls(connection)) = opt_connection { + if let Some(ConnectionStage::EstablishedWsTls(connection)) = opt_connection { if connection.peer_addr != meta.peer_addr { eprintln!("socket worker: peer socket addrs didn't match");