From 846e076b4b8d04db82d5508fc67df69987f78909 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 13 May 2020 19:34:46 +0200 Subject: [PATCH] aquatic_ws: move code into Connection impl, so inner can be private --- aquatic_ws/src/lib/network/common.rs | 46 +++++++++++++++++++++++++++- aquatic_ws/src/lib/network/mod.rs | 36 +++++++++------------- aquatic_ws/src/lib/network/utils.rs | 18 ++--------- 3 files changed, 62 insertions(+), 38 deletions(-) diff --git a/aquatic_ws/src/lib/network/common.rs b/aquatic_ws/src/lib/network/common.rs index c65da9a..c681fe1 100644 --- a/aquatic_ws/src/lib/network/common.rs +++ b/aquatic_ws/src/lib/network/common.rs @@ -183,7 +183,51 @@ impl HandshakeMachine { pub struct Connection { pub valid_until: ValidUntil, - pub inner: Either, + inner: Either, +} + + +impl Connection { + pub fn new( + valid_until: ValidUntil, + inner: Either + ) -> Self { + Self { + valid_until, + inner + } + } + + pub fn is_established(&self) -> bool { + self.inner.is_left() + } + + pub fn get_established_ws<'a>(&mut self) -> Option<&mut EstablishedWs> { + match self.inner { + Either::Left(ref mut ews) => Some(ews), + Either::Right(_) => None, + } + } + + pub fn get_machine(self) -> Option { + match self.inner { + Either::Left(_) => None, + Either::Right(machine) => Some(machine), + } + } + + pub fn close(&mut self){ + if let Either::Left(ref mut ews) = self.inner { + if ews.ws.can_read(){ + ews.ws.close(None).unwrap(); + + // Needs to be done after ws.close() + if let Err(err) = ews.ws.write_pending(){ + dbg!(err); + } + } + } + } } diff --git a/aquatic_ws/src/lib/network/mod.rs b/aquatic_ws/src/lib/network/mod.rs index b813ab4..03bc2fc 100644 --- a/aquatic_ws/src/lib/network/mod.rs +++ b/aquatic_ws/src/lib/network/mod.rs @@ -116,10 +116,10 @@ fn accept_new_streams( .register(&mut stream, token, Interest::READABLE) .unwrap(); - let connection = Connection { + let connection = Connection::new( valid_until, - inner: Either::Right(HandshakeMachine::new(stream)) - }; + Either::Right(HandshakeMachine::new(stream)) + ); connections.insert(token, connection); }, @@ -147,10 +147,9 @@ pub fn run_handshakes_and_read_messages( valid_until: ValidUntil, ){ loop { - if let Some(Connection { - inner: Either::Left(established_ws), - .. - }) = connections.get_mut(&poll_token){ + if let Some(established_ws) = connections.get_mut(&poll_token) + .and_then(Connection::get_established_ws) + { use ::tungstenite::Error::Io; match established_ws.ws.read_message(){ @@ -180,18 +179,14 @@ pub fn run_handshakes_and_read_messages( break; } } - } else if let Some(Connection { - inner: Either::Right(machine), - .. - }) = connections.remove(&poll_token) { + } else if let Some(machine) = connections.remove(&poll_token) + .and_then(Connection::get_machine) + { let (result, stop_loop) = machine .advance(opt_tls_acceptor); if let Some(inner) = result { - let connection = Connection { - valid_until, - inner, - }; + let connection = Connection::new(valid_until, inner); connections.insert(poll_token, connection); } @@ -210,12 +205,11 @@ pub fn send_out_messages( connections: &mut ConnectionMap, ){ for (meta, out_message) in out_message_receiver { - let opt_inner = connections - .get_mut(&meta.poll_token) - .map(|v| &mut v.inner); + let opt_established_ws = connections.get_mut(&meta.poll_token) + .and_then(Connection::get_established_ws); - if let Some(Either::Left(connection)) = opt_inner { - if connection.peer_addr != meta.peer_addr { + if let Some(established_ws) = opt_established_ws { + if established_ws.peer_addr != meta.peer_addr { eprintln!("socket worker: peer socket addrs didn't match"); continue; @@ -225,7 +219,7 @@ pub fn send_out_messages( use ::tungstenite::Error::Io; - match connection.ws.write_message(out_message.to_ws_message()){ + match established_ws.ws.write_message(out_message.to_ws_message()){ Ok(()) => {}, Err(Io(err)) if err.kind() == ErrorKind::WouldBlock => { continue; diff --git a/aquatic_ws/src/lib/network/utils.rs b/aquatic_ws/src/lib/network/utils.rs index 1e370aa..9f7dfb1 100644 --- a/aquatic_ws/src/lib/network/utils.rs +++ b/aquatic_ws/src/lib/network/utils.rs @@ -2,7 +2,6 @@ use std::fs::File; use std::io::Read; use std::time::Instant; -use either::Either; use mio::Token; use native_tls::{Identity, TlsAcceptor}; use net2::{TcpBuilder, unix::UnixTcpBuilderExt}; @@ -58,19 +57,6 @@ pub fn create_tls_acceptor( } -pub fn close_connection(connection: &mut Connection){ - if let Either::Left(ref mut ews) = connection.inner { - if ews.ws.can_read(){ - ews.ws.close(None).unwrap(); - - // Needs to be done after ws.close() - if let Err(err) = ews.ws.write_pending(){ - dbg!(err); - } - } - } -} - /// Don't bother with deregistering from Poll. In my understanding, this is /// done automatically when the stream is dropped, as long as there are no @@ -81,7 +67,7 @@ pub fn remove_connection_if_exists( token: Token, ){ if let Some(mut connection) = connections.remove(&token){ - close_connection(&mut connection); + connection.close(); } } @@ -94,7 +80,7 @@ pub fn remove_inactive_connections( connections.retain(|_, connection| { if connection.valid_until.0 < now { - close_connection(connection); + connection.close(); println!("closing connection, it is inactive");