diff --git a/aquatic_ws/src/lib/network.rs b/aquatic_ws/src/lib/network.rs index 8c40136..29be704 100644 --- a/aquatic_ws/src/lib/network.rs +++ b/aquatic_ws/src/lib/network.rs @@ -1,7 +1,6 @@ use std::net::{SocketAddr}; use std::time::{Duration, Instant}; use std::io::ErrorKind; -use std::option::Option; use tungstenite::WebSocket; use tungstenite::handshake::{MidHandshake, HandshakeError, server::{ServerHandshake, NoCallback}}; @@ -15,7 +14,7 @@ use crate::protocol::*; pub struct Connection { - valid_until: Option, + valid_until: ValidUntil, stage: ConnectionStage, } @@ -54,32 +53,78 @@ impl ::tungstenite::handshake::server::Callback for DebugCallback { } +// Close and remove inactive connections +pub fn remove_inactive_connections( + poll: &mut Poll, + connections: &mut ConnectionMap, +){ + let now = Instant::now(); + + connections.retain(|_, connection| { + if connection.valid_until.0 < now { + match connection.stage { + ConnectionStage::Stream(ref mut stream) => { + poll.registry() + .deregister(stream) + .unwrap(); + }, + ConnectionStage::MidHandshake(ref mut handshake) => { + poll.registry() + .deregister(handshake.get_mut().get_mut()) + .unwrap(); + }, + ConnectionStage::Established(ref mut peer_connection) => { + peer_connection.ws.close(None).unwrap(); + + // Needs to be done after ws.close() + if let Err(err) = peer_connection.ws.write_pending(){ + dbg!(err); + } + + poll.registry() + .deregister(peer_connection.ws.get_mut()) + .unwrap(); + }, + } + + println!("closing connection, it is inactive"); + + false + } else { + println!("keeping connection, it is still active"); + + true + } + }); +} + + pub fn run_socket_worker( address: SocketAddr, socket_worker_index: usize, in_message_sender: InMessageSender, out_message_receiver: OutMessageReceiver, ){ + let poll_timeout = Duration::from_millis(50); // FIXME: config + let mut listener = TcpListener::bind(address).unwrap(); let mut poll = Poll::new().expect("create poll"); + let mut events = Events::with_capacity(1024); // FIXME: config poll.registry() .register(&mut listener, Token(0), Interest::READABLE) .unwrap(); - let mut events = Events::with_capacity(1024); // FIXME: config - - let timeout = Duration::from_millis(50); // FIXME: config - let mut connections: ConnectionMap = HashMap::new(); let mut poll_token_counter = Token(0usize); + let mut iter_counter = 0usize; loop { - poll.poll(&mut events, Some(timeout)) + poll.poll(&mut events, Some(poll_timeout)) .expect("failed polling"); - let valid_until = ValidUntil::new(600); + let valid_until = ValidUntil::new(600); // FIXME: config for event in events.iter(){ let token = event.token(); @@ -104,46 +149,18 @@ pub fn run_socket_worker( } } - let now = Instant::now(); - - // Close connections after some time of inactivity and write pending - // messages (which is required after closing anyway.) - // - // FIXME: peers need to be removed too, wherever they are stored -/* connections.retain(|_, opt_connection| { - if let Some(connection) = opt_connection { - if connection.valid_until.0 < now { - connection.ws.close(None).unwrap(); - } - - loop { - match connection.ws.write_pending(){ - Err(tungstenite::Error::Io(err)) => { - if err.kind() == ErrorKind::WouldBlock { - break - } - }, - Err(tungstenite::Error::ConnectionClosed) => { - // FIXME: necessary? - poll.registry() - .deregister(connection.ws.get_mut()) - .unwrap(); - - return false; - }, - _ => {} - } - } - } - - true - }); */ - send_out_messages( out_message_receiver.drain(), &mut poll, &mut connections ); + + // Remove inactive connections, but not every iteration + if iter_counter % 128 == 0 { + remove_inactive_connections(&mut poll, &mut connections); + } + + iter_counter = iter_counter.wrapping_add(1); } } @@ -203,7 +220,7 @@ fn accept_new_streams( .unwrap(); let connection = Connection { - valid_until: Some(valid_until), + valid_until, stage: ConnectionStage::Stream(stream) }; @@ -240,7 +257,7 @@ pub fn handle_handshake_result( }; let connection = Connection { - valid_until: Some(valid_until), + valid_until, stage: ConnectionStage::Established(peer_connection) }; @@ -252,7 +269,7 @@ pub fn handle_handshake_result( println!("interrupted"); let connection = Connection { - valid_until: Some(valid_until), + valid_until, stage: ConnectionStage::MidHandshake(handshake), };