diff --git a/aquatic_ws/src/lib/network.rs b/aquatic_ws/src/lib/network.rs index 322bcb2..e079b3a 100644 --- a/aquatic_ws/src/lib/network.rs +++ b/aquatic_ws/src/lib/network.rs @@ -16,7 +16,7 @@ use crate::protocol::*; pub struct PeerConnection { pub ws: WebSocket, pub peer_socket_addr: SocketAddr, - pub valid_until: Instant, + pub valid_until: ValidUntil, } @@ -33,9 +33,9 @@ pub fn run_socket_worker( .register(&mut listener, Token(0), Interest::READABLE) .unwrap(); - let mut events = Events::with_capacity(1024); + let mut events = Events::with_capacity(1024); // FIXME: config - let timeout = Duration::from_millis(50); + let timeout = Duration::from_millis(50); // FIXME: config let mut connections: Slab> = Slab::new(); @@ -46,82 +46,27 @@ pub fn run_socket_worker( poll.poll(&mut events, Some(timeout)) .expect("failed polling"); - let valid_until = Instant::now() + Duration::from_secs(600); + let valid_until = ValidUntil::new(600); for event in events.iter(){ let token = event.token(); if token.0 == 0 { - loop { - match listener.accept(){ - Ok((mut stream, src)) => { - let entry = connections.vacant_entry(); - let token = Token(entry.key()); - - poll.registry() - .register(&mut stream, token, Interest::READABLE) - .unwrap(); - - // FIXME: will this cause issues due to blocking? - // Should handshake be started manually below - // instead? - let ws = tungstenite::server::accept(stream).unwrap(); - - let peer_connection = PeerConnection { - ws, - peer_socket_addr: src, - valid_until, - }; - - entry.insert(Some(peer_connection)); - }, - Err(err) => { - if err.kind() == ErrorKind::WouldBlock { - break - } - - eprint!("{}", err); - } - } - } + accept_new_connections( + &mut listener, + &mut poll, + &mut connections, + valid_until + ); } else if event.is_readable(){ - loop { - if let Some(Some(connection)) = connections.get_mut(token.0){ - match connection.ws.read_message(){ - Ok(ws_message) => { - if let Some(in_message) = InMessage::from_ws_message(ws_message){ - let meta = ConnectionMeta { - socket_worker_index, - socket_worker_slab_index: token.0, - peer_socket_addr: connection.peer_socket_addr - }; - - in_message_sender.send((meta, in_message)); - } - - connection.valid_until = valid_until; - }, - Err(tungstenite::Error::Io(err)) => { - if err.kind() == ErrorKind::WouldBlock { - break - } - - eprint!("{}", err); - }, - Err(tungstenite::Error::ConnectionClosed) => { - // FIXME: necessary? - poll.registry() - .deregister(connection.ws.get_mut()) - .unwrap(); - - connections.remove(token.0); - }, - Err(err) => { - eprint!("{}", err); - } - } - } - } + read_and_forward_in_messages( + socket_worker_index, + &in_message_sender, + &mut poll, + &mut connections, + token, + valid_until + ); } } @@ -133,7 +78,7 @@ pub fn run_socket_worker( // FIXME: peers need to be removed too, wherever they are stored connections.retain(|_, opt_connection| { if let Some(connection) = opt_connection { - if connection.valid_until < now { + if connection.valid_until.0 < now { connection.ws.close(None).unwrap(); } @@ -160,40 +105,142 @@ pub fn run_socket_worker( true }); - // Read messages from channel, send to peers - for (meta, out_message) in out_message_receiver.drain(){ - let opt_connection = connections - .get_mut(meta.socket_worker_slab_index); + send_out_messages( + out_message_receiver.drain(), + &mut poll, + &mut connections + ); + } +} - if let Some(Some(connection)) = opt_connection { - if connection.peer_socket_addr != meta.peer_socket_addr { - eprintln!("socket worker: peer socket addrs didn't match"); - continue; +fn accept_new_connections( + listener: &mut TcpListener, + poll: &mut Poll, + connections: &mut Slab>, + valid_until: ValidUntil, +){ + loop { + match listener.accept(){ + Ok((mut stream, src)) => { + let entry = connections.vacant_entry(); + let token = Token(entry.key()); + + poll.registry() + .register(&mut stream, token, Interest::READABLE) + .unwrap(); + + // FIXME: will this cause issues due to blocking? + // Should handshake be started manually below + // instead? + let ws = tungstenite::server::accept(stream).unwrap(); + + let peer_connection = PeerConnection { + ws, + peer_socket_addr: src, + valid_until, + }; + + entry.insert(Some(peer_connection)); + }, + Err(err) => { + if err.kind() == ErrorKind::WouldBlock { + break } - match connection.ws.write_message(out_message.to_ws_message()){ - Ok(()) => {}, - Err(tungstenite::Error::Io(err)) => { - if err.kind() == ErrorKind::WouldBlock { - continue; - } + eprint!("{}", err); + } + } + } +} - eprint!("{}", err); - }, - Err(tungstenite::Error::ConnectionClosed) => { - // FIXME: necessary? - poll.registry() - .deregister(connection.ws.get_mut()) - .unwrap(); - connections.remove(meta.socket_worker_slab_index); - }, - Err(err) => { - eprint!("{}", err); - }, +pub fn read_and_forward_in_messages( + socket_worker_index: usize, + in_message_sender: &InMessageSender, + poll: &mut Poll, + connections: &mut Slab>, + poll_token: Token, + valid_until: ValidUntil, +){ + loop { + if let Some(Some(connection)) = connections.get_mut(poll_token.0){ + match connection.ws.read_message(){ + Ok(ws_message) => { + if let Some(in_message) = InMessage::from_ws_message(ws_message){ + let meta = ConnectionMeta { + socket_worker_index, + socket_worker_slab_index: poll_token.0, + peer_socket_addr: connection.peer_socket_addr + }; + + in_message_sender.send((meta, in_message)); + } + + connection.valid_until = valid_until; + }, + Err(tungstenite::Error::Io(err)) => { + if err.kind() == ErrorKind::WouldBlock { + break + } + + eprint!("{}", err); + }, + Err(tungstenite::Error::ConnectionClosed) => { + // FIXME: necessary? + poll.registry() + .deregister(connection.ws.get_mut()) + .unwrap(); + + connections.remove(poll_token.0); + }, + Err(err) => { + eprint!("{}", err); } } } } +} + + +pub fn send_out_messages( + out_message_receiver: ::flume::Drain<(ConnectionMeta, OutMessage)>, + poll: &mut Poll, + connections: &mut Slab>, +){ + // Read messages from channel, send to peers + for (meta, out_message) in out_message_receiver { + let opt_connection = connections + .get_mut(meta.socket_worker_slab_index); + + if let Some(Some(connection)) = opt_connection { + if connection.peer_socket_addr != meta.peer_socket_addr { + eprintln!("socket worker: peer socket addrs didn't match"); + + continue; + } + + match connection.ws.write_message(out_message.to_ws_message()){ + Ok(()) => {}, + Err(tungstenite::Error::Io(err)) => { + if err.kind() == ErrorKind::WouldBlock { + continue; + } + + eprint!("{}", err); + }, + Err(tungstenite::Error::ConnectionClosed) => { + // FIXME: necessary? + poll.registry() + .deregister(connection.ws.get_mut()) + .unwrap(); + + connections.remove(meta.socket_worker_slab_index); + }, + Err(err) => { + eprint!("{}", err); + }, + } + } + } } \ No newline at end of file