From 012ccd7ec1960fa2afabea86e05ce9304574c7f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 13 May 2020 15:03:02 +0200 Subject: [PATCH] aquatic_ws: simplify network code --- aquatic_ws/src/lib/network/mod.rs | 62 +++++++++++--------------- aquatic_ws/src/lib/network/utils.rs | 67 +++++++---------------------- 2 files changed, 42 insertions(+), 87 deletions(-) diff --git a/aquatic_ws/src/lib/network/mod.rs b/aquatic_ws/src/lib/network/mod.rs index ab21a35..b63a9a5 100644 --- a/aquatic_ws/src/lib/network/mod.rs +++ b/aquatic_ws/src/lib/network/mod.rs @@ -72,7 +72,6 @@ pub fn run_socket_worker( socket_worker_index, &in_message_sender, &opt_tls_acceptor, - &mut poll, &mut connections, token, valid_until, @@ -82,13 +81,12 @@ pub fn run_socket_worker( send_out_messages( out_message_receiver.drain(), - &mut poll, &mut connections ); // Remove inactive connections, but not every iteration if iter_counter % 128 == 0 { - remove_inactive_connections(&mut poll, &mut connections); + remove_inactive_connections(&mut connections); } iter_counter = iter_counter.wrapping_add(1); @@ -114,7 +112,7 @@ fn accept_new_streams( let token = *poll_token_counter; - remove_connection_if_exists(poll, connections, token); + remove_connection_if_exists(connections, token); poll.registry() .register(&mut stream, token, Interest::READABLE) @@ -191,7 +189,6 @@ pub fn run_handshakes_and_read_messages( socket_worker_index: usize, in_message_sender: &InMessageSender, opt_tls_acceptor: &Option, // If set, run TLS - poll: &mut Poll, connections: &mut ConnectionMap, poll_token: Token, valid_until: ValidUntil, @@ -202,7 +199,6 @@ pub fn run_handshakes_and_read_messages( let established = if let Some(c) = connections.get(&poll_token){ c.stage.is_established() } else { - // Connection is not present, so it is closed. Stop processing break; }; @@ -342,21 +338,21 @@ pub fn run_handshakes_and_read_messages( break; } - remove_connection_if_exists(poll, connections, poll_token); + remove_connection_if_exists(connections, poll_token); eprint!("{}", err); break; }, Err(tungstenite::Error::ConnectionClosed) => { - remove_connection_if_exists(poll, connections, poll_token); + remove_connection_if_exists(connections, poll_token); break; }, Err(err) => { dbg!(err); - remove_connection_if_exists(poll, connections, poll_token); + remove_connection_if_exists(connections, poll_token); break; } @@ -369,7 +365,6 @@ pub fn run_handshakes_and_read_messages( /// Read messages from channel, send to peers pub fn send_out_messages( out_message_receiver: ::flume::Drain<(ConnectionMeta, OutMessage)>, - poll: &mut Poll, connections: &mut ConnectionMap, ){ for (meta, out_message) in out_message_receiver { @@ -377,36 +372,31 @@ pub fn send_out_messages( .get_mut(&meta.poll_token) .map(|v| &mut v.stage); - use ::tungstenite::Error::Io; + if let Some(ConnectionStage::EstablishedWs(connection)) = opt_stage { + if connection.peer_addr != meta.peer_addr { + eprintln!("socket worker: peer socket addrs didn't match"); + + continue; + } + + dbg!(out_message.clone()); - // Exactly the same for both established stages - match opt_stage { - Some(ConnectionStage::EstablishedWs(connection)) => { - if connection.peer_addr != meta.peer_addr { - eprintln!("socket worker: peer socket addrs didn't match"); + use ::tungstenite::Error::Io; + match connection.ws.write_message(out_message.to_ws_message()){ + Ok(()) => {}, + Err(Io(err)) if err.kind() == ErrorKind::WouldBlock => { continue; - } + }, + Err(err) => { + dbg!(err); - dbg!(out_message.clone()); - - match connection.ws.write_message(out_message.to_ws_message()){ - Ok(()) => {}, - Err(Io(err)) if err.kind() == ErrorKind::WouldBlock => { - continue; - }, - Err(err) => { - dbg!(err); - - remove_connection_if_exists( - poll, - connections, - meta.poll_token - ); - }, - } - }, - _ => {}, + remove_connection_if_exists( + connections, + meta.poll_token + ); + }, + } } } } \ No newline at end of file diff --git a/aquatic_ws/src/lib/network/utils.rs b/aquatic_ws/src/lib/network/utils.rs index 05ba2df..218d695 100644 --- a/aquatic_ws/src/lib/network/utils.rs +++ b/aquatic_ws/src/lib/network/utils.rs @@ -2,7 +2,7 @@ use std::fs::File; use std::io::Read; use std::time::Instant; -use mio::{Poll, Token}; +use mio::Token; use native_tls::{Identity, TlsAcceptor}; use net2::{TcpBuilder, unix::UnixTcpBuilderExt}; @@ -57,78 +57,43 @@ pub fn create_tls_acceptor( } -/// FIXME -pub fn close_and_deregister_connection( - poll: &mut Poll, - connection: &mut Connection, -){ - match connection.stage { - ConnectionStage::TcpStream(ref mut stream) => { - /* - poll.registry() - .deregister(stream) - .unwrap(); - */ - }, - ConnectionStage::TlsStream(ref mut stream) => { +pub fn close_connection(connection: &mut Connection){ + if let ConnectionStage::EstablishedWs(ref mut ews) = connection.stage { + if ews.ws.can_read(){ + ews.ws.close(None).unwrap(); - } - ConnectionStage::TlsMidHandshake(ref mut handshake) => { - /* - poll.registry() - .deregister(handshake.get_mut()) - .unwrap(); - */ - }, - ConnectionStage::WsMidHandshake(ref mut handshake) => { - /* - poll.registry() - .deregister(handshake.get_mut().get_mut()) - .unwrap(); - */ - }, - ConnectionStage::EstablishedWs(ref mut established_ws) => { - if established_ws.ws.can_read(){ - established_ws.ws.close(None).unwrap(); - - // Needs to be done after ws.close() - if let Err(err) = established_ws.ws.write_pending(){ - dbg!(err); - } + // Needs to be done after ws.close() + if let Err(err) = ews.ws.write_pending(){ + dbg!(err); } - - /* - poll.registry() - .deregister(established_ws.ws.get_mut()) - .unwrap(); - */ - }, + } } } +/// Don't bother with deregistering from Poll. In my understanding, this is +/// done automatically when the stream is dropped, as long as there are no +/// other references to the file descriptor, such as when it is accessed +/// in multiple threads. pub fn remove_connection_if_exists( - poll: &mut Poll, connections: &mut ConnectionMap, token: Token, ){ if let Some(mut connection) = connections.remove(&token){ - close_and_deregister_connection(poll, &mut connection); - - connections.remove(&token); + close_connection(&mut connection); } } + // Close and remove inactive connections pub fn remove_inactive_connections( - poll: &mut Poll, connections: &mut ConnectionMap, ){ let now = Instant::now(); connections.retain(|_, connection| { if connection.valid_until.0 < now { - close_and_deregister_connection(poll, connection); + close_connection(connection); println!("closing connection, it is inactive");