From 0ec73d6cea8924569d73fc30b30b2ae9135c46c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 11 May 2020 13:55:28 +0200 Subject: [PATCH] aquatic_ws: network: ConnectionMap: use Token as key --- aquatic_ws/src/lib/common.rs | 3 ++- aquatic_ws/src/lib/network.rs | 47 +++++++++++++++++++---------------- 2 files changed, 27 insertions(+), 23 deletions(-) diff --git a/aquatic_ws/src/lib/common.rs b/aquatic_ws/src/lib/common.rs index ce3687c..d5bf2e0 100644 --- a/aquatic_ws/src/lib/common.rs +++ b/aquatic_ws/src/lib/common.rs @@ -5,6 +5,7 @@ use flume::{Sender, Receiver}; use hashbrown::HashMap; use indexmap::IndexMap; use parking_lot::Mutex; +use mio::Token; pub use aquatic::common::ValidUntil; @@ -19,7 +20,7 @@ pub struct ConnectionMeta { /// SocketAddr of peer pub peer_socket_addr: SocketAddr, /// Slab index of PeerConnection - pub socket_worker_slab_index: usize, + pub socket_worker_poll_token: Token, } diff --git a/aquatic_ws/src/lib/network.rs b/aquatic_ws/src/lib/network.rs index 0f09ac1..842f179 100644 --- a/aquatic_ws/src/lib/network.rs +++ b/aquatic_ws/src/lib/network.rs @@ -34,6 +34,9 @@ pub struct PeerConnection { } +pub type ConnectionMap = HashMap; + + pub fn run_socket_worker( address: SocketAddr, socket_worker_index: usize, @@ -51,9 +54,9 @@ pub fn run_socket_worker( let timeout = Duration::from_millis(50); // FIXME: config - let mut connections: HashMap = HashMap::new(); + let mut connections: ConnectionMap = HashMap::new(); - let mut poll_token_counter = 1usize; + let mut poll_token_counter = Token(0usize); loop { poll.poll(&mut events, Some(timeout)) @@ -131,20 +134,20 @@ pub fn run_socket_worker( fn accept_new_streams( listener: &mut TcpListener, poll: &mut Poll, - connections: &mut HashMap, + connections: &mut ConnectionMap, valid_until: ValidUntil, - poll_token_counter: &mut usize, + poll_token_counter: &mut Token, ){ loop { match listener.accept(){ Ok((mut stream, _)) => { - *poll_token_counter = poll_token_counter.wrapping_add(1); + poll_token_counter.0 = poll_token_counter.0.wrapping_add(1); - if *poll_token_counter == 0 { - *poll_token_counter = 1; + if poll_token_counter.0 == 0 { + poll_token_counter.0 = 1; } - let token = Token(*poll_token_counter); + let token = *poll_token_counter; poll.registry() .register(&mut stream, token, Interest::READABLE) @@ -155,7 +158,7 @@ fn accept_new_streams( stage: ConnectionStage::Stream(stream) }; - connections.insert(token.0, connection); + connections.insert(token, connection); }, Err(err) => { if err.kind() == ErrorKind::WouldBlock { @@ -189,14 +192,14 @@ pub fn read_and_forward_in_messages( socket_worker_index: usize, in_message_sender: &InMessageSender, poll: &mut Poll, - connections: &mut HashMap, + connections: &mut ConnectionMap, poll_token: Token, valid_until: ValidUntil, ){ println!("poll_token: {}", poll_token.0); loop { - let established = match connections.get(&poll_token.0).map(|c| &c.stage){ + let established = match connections.get(&poll_token).map(|c| &c.stage){ Some(ConnectionStage::Stream(_)) => false, Some(ConnectionStage::MidHandshake(_)) => false, Some(ConnectionStage::Established(_)) => true, @@ -204,7 +207,7 @@ pub fn read_and_forward_in_messages( }; if !established { - let conn = connections.remove(&poll_token.0).unwrap(); + let conn = connections.remove(&poll_token).unwrap(); match conn.stage { ConnectionStage::Stream(stream) => { @@ -224,7 +227,7 @@ pub fn read_and_forward_in_messages( stage: ConnectionStage::Established(peer_connection) }; - connections.insert(poll_token.0, connection); + connections.insert(poll_token, connection); }, Err(HandshakeError::Interrupted(handshake)) => { println!("interrupted"); @@ -234,7 +237,7 @@ pub fn read_and_forward_in_messages( stage: ConnectionStage::MidHandshake(handshake), }; - connections.insert(poll_token.0, connection); + connections.insert(poll_token, connection); break; }, @@ -261,7 +264,7 @@ pub fn read_and_forward_in_messages( stage: ConnectionStage::Established(peer_connection) }; - connections.insert(poll_token.0, connection); + connections.insert(poll_token, connection); }, Err(HandshakeError::Interrupted(handshake)) => { let connection = Connection { @@ -269,7 +272,7 @@ pub fn read_and_forward_in_messages( stage: ConnectionStage::MidHandshake(handshake), }; - connections.insert(poll_token.0, connection); + connections.insert(poll_token, connection); break; }, @@ -280,7 +283,7 @@ pub fn read_and_forward_in_messages( }, _ => unreachable!(), } - } else if let Some(Connection{ stage: ConnectionStage::Established(connection), ..}) = connections.get_mut(&poll_token.0){ + } else if let Some(Connection{ stage: ConnectionStage::Established(connection), ..}) = connections.get_mut(&poll_token){ println!("conn established"); match connection.ws.read_message(){ @@ -292,7 +295,7 @@ pub fn read_and_forward_in_messages( let meta = ConnectionMeta { socket_worker_index, - socket_worker_slab_index: poll_token.0, + socket_worker_poll_token: poll_token, peer_socket_addr: connection.peer_socket_addr }; @@ -314,7 +317,7 @@ pub fn read_and_forward_in_messages( .deregister(connection.ws.get_mut()) .unwrap(); - connections.remove(&poll_token.0); + connections.remove(&poll_token); }, Err(err) => { eprint!("{}", err); @@ -328,12 +331,12 @@ pub fn read_and_forward_in_messages( pub fn send_out_messages( out_message_receiver: ::flume::Drain<(ConnectionMeta, OutMessage)>, poll: &mut Poll, - connections: &mut HashMap, + connections: &mut ConnectionMap, ){ // Read messages from channel, send to peers for (meta, out_message) in out_message_receiver { let opt_connection = connections - .get_mut(&meta.socket_worker_slab_index) + .get_mut(&meta.socket_worker_poll_token) .map(|v| &mut v.stage); if let Some(ConnectionStage::Established(connection)) = opt_connection { @@ -360,7 +363,7 @@ pub fn send_out_messages( .deregister(connection.ws.get_mut()) .unwrap(); - connections.remove(&meta.socket_worker_slab_index); + connections.remove(&meta.socket_worker_poll_token); }, Err(err) => { eprint!("{}", err);