aquatic_ws: network: ConnectionMap: use Token as key

This commit is contained in:
Joakim Frostegård 2020-05-11 13:55:28 +02:00
parent a85a72ff66
commit 0ec73d6cea
2 changed files with 27 additions and 23 deletions

View file

@ -5,6 +5,7 @@ use flume::{Sender, Receiver};
use hashbrown::HashMap; use hashbrown::HashMap;
use indexmap::IndexMap; use indexmap::IndexMap;
use parking_lot::Mutex; use parking_lot::Mutex;
use mio::Token;
pub use aquatic::common::ValidUntil; pub use aquatic::common::ValidUntil;
@ -19,7 +20,7 @@ pub struct ConnectionMeta {
/// SocketAddr of peer /// SocketAddr of peer
pub peer_socket_addr: SocketAddr, pub peer_socket_addr: SocketAddr,
/// Slab index of PeerConnection /// Slab index of PeerConnection
pub socket_worker_slab_index: usize, pub socket_worker_poll_token: Token,
} }

View file

@ -34,6 +34,9 @@ pub struct PeerConnection {
} }
pub type ConnectionMap = HashMap<Token, Connection>;
pub fn run_socket_worker( pub fn run_socket_worker(
address: SocketAddr, address: SocketAddr,
socket_worker_index: usize, socket_worker_index: usize,
@ -51,9 +54,9 @@ pub fn run_socket_worker(
let timeout = Duration::from_millis(50); // FIXME: config let timeout = Duration::from_millis(50); // FIXME: config
let mut connections: HashMap<usize, Connection> = HashMap::new(); let mut connections: ConnectionMap = HashMap::new();
let mut poll_token_counter = 1usize; let mut poll_token_counter = Token(0usize);
loop { loop {
poll.poll(&mut events, Some(timeout)) poll.poll(&mut events, Some(timeout))
@ -131,20 +134,20 @@ pub fn run_socket_worker(
fn accept_new_streams( fn accept_new_streams(
listener: &mut TcpListener, listener: &mut TcpListener,
poll: &mut Poll, poll: &mut Poll,
connections: &mut HashMap<usize, Connection>, connections: &mut ConnectionMap,
valid_until: ValidUntil, valid_until: ValidUntil,
poll_token_counter: &mut usize, poll_token_counter: &mut Token,
){ ){
loop { loop {
match listener.accept(){ match listener.accept(){
Ok((mut stream, _)) => { Ok((mut stream, _)) => {
*poll_token_counter = poll_token_counter.wrapping_add(1); poll_token_counter.0 = poll_token_counter.0.wrapping_add(1);
if *poll_token_counter == 0 { if poll_token_counter.0 == 0 {
*poll_token_counter = 1; poll_token_counter.0 = 1;
} }
let token = Token(*poll_token_counter); let token = *poll_token_counter;
poll.registry() poll.registry()
.register(&mut stream, token, Interest::READABLE) .register(&mut stream, token, Interest::READABLE)
@ -155,7 +158,7 @@ fn accept_new_streams(
stage: ConnectionStage::Stream(stream) stage: ConnectionStage::Stream(stream)
}; };
connections.insert(token.0, connection); connections.insert(token, connection);
}, },
Err(err) => { Err(err) => {
if err.kind() == ErrorKind::WouldBlock { if err.kind() == ErrorKind::WouldBlock {
@ -189,14 +192,14 @@ pub fn read_and_forward_in_messages(
socket_worker_index: usize, socket_worker_index: usize,
in_message_sender: &InMessageSender, in_message_sender: &InMessageSender,
poll: &mut Poll, poll: &mut Poll,
connections: &mut HashMap<usize, Connection>, connections: &mut ConnectionMap,
poll_token: Token, poll_token: Token,
valid_until: ValidUntil, valid_until: ValidUntil,
){ ){
println!("poll_token: {}", poll_token.0); println!("poll_token: {}", poll_token.0);
loop { loop {
let established = match connections.get(&poll_token.0).map(|c| &c.stage){ let established = match connections.get(&poll_token).map(|c| &c.stage){
Some(ConnectionStage::Stream(_)) => false, Some(ConnectionStage::Stream(_)) => false,
Some(ConnectionStage::MidHandshake(_)) => false, Some(ConnectionStage::MidHandshake(_)) => false,
Some(ConnectionStage::Established(_)) => true, Some(ConnectionStage::Established(_)) => true,
@ -204,7 +207,7 @@ pub fn read_and_forward_in_messages(
}; };
if !established { if !established {
let conn = connections.remove(&poll_token.0).unwrap(); let conn = connections.remove(&poll_token).unwrap();
match conn.stage { match conn.stage {
ConnectionStage::Stream(stream) => { ConnectionStage::Stream(stream) => {
@ -224,7 +227,7 @@ pub fn read_and_forward_in_messages(
stage: ConnectionStage::Established(peer_connection) stage: ConnectionStage::Established(peer_connection)
}; };
connections.insert(poll_token.0, connection); connections.insert(poll_token, connection);
}, },
Err(HandshakeError::Interrupted(handshake)) => { Err(HandshakeError::Interrupted(handshake)) => {
println!("interrupted"); println!("interrupted");
@ -234,7 +237,7 @@ pub fn read_and_forward_in_messages(
stage: ConnectionStage::MidHandshake(handshake), stage: ConnectionStage::MidHandshake(handshake),
}; };
connections.insert(poll_token.0, connection); connections.insert(poll_token, connection);
break; break;
}, },
@ -261,7 +264,7 @@ pub fn read_and_forward_in_messages(
stage: ConnectionStage::Established(peer_connection) stage: ConnectionStage::Established(peer_connection)
}; };
connections.insert(poll_token.0, connection); connections.insert(poll_token, connection);
}, },
Err(HandshakeError::Interrupted(handshake)) => { Err(HandshakeError::Interrupted(handshake)) => {
let connection = Connection { let connection = Connection {
@ -269,7 +272,7 @@ pub fn read_and_forward_in_messages(
stage: ConnectionStage::MidHandshake(handshake), stage: ConnectionStage::MidHandshake(handshake),
}; };
connections.insert(poll_token.0, connection); connections.insert(poll_token, connection);
break; break;
}, },
@ -280,7 +283,7 @@ pub fn read_and_forward_in_messages(
}, },
_ => unreachable!(), _ => unreachable!(),
} }
} else if let Some(Connection{ stage: ConnectionStage::Established(connection), ..}) = connections.get_mut(&poll_token.0){ } else if let Some(Connection{ stage: ConnectionStage::Established(connection), ..}) = connections.get_mut(&poll_token){
println!("conn established"); println!("conn established");
match connection.ws.read_message(){ match connection.ws.read_message(){
@ -292,7 +295,7 @@ pub fn read_and_forward_in_messages(
let meta = ConnectionMeta { let meta = ConnectionMeta {
socket_worker_index, socket_worker_index,
socket_worker_slab_index: poll_token.0, socket_worker_poll_token: poll_token,
peer_socket_addr: connection.peer_socket_addr peer_socket_addr: connection.peer_socket_addr
}; };
@ -314,7 +317,7 @@ pub fn read_and_forward_in_messages(
.deregister(connection.ws.get_mut()) .deregister(connection.ws.get_mut())
.unwrap(); .unwrap();
connections.remove(&poll_token.0); connections.remove(&poll_token);
}, },
Err(err) => { Err(err) => {
eprint!("{}", err); eprint!("{}", err);
@ -328,12 +331,12 @@ pub fn read_and_forward_in_messages(
pub fn send_out_messages( pub fn send_out_messages(
out_message_receiver: ::flume::Drain<(ConnectionMeta, OutMessage)>, out_message_receiver: ::flume::Drain<(ConnectionMeta, OutMessage)>,
poll: &mut Poll, poll: &mut Poll,
connections: &mut HashMap<usize, Connection>, connections: &mut ConnectionMap,
){ ){
// Read messages from channel, send to peers // Read messages from channel, send to peers
for (meta, out_message) in out_message_receiver { for (meta, out_message) in out_message_receiver {
let opt_connection = connections let opt_connection = connections
.get_mut(&meta.socket_worker_slab_index) .get_mut(&meta.socket_worker_poll_token)
.map(|v| &mut v.stage); .map(|v| &mut v.stage);
if let Some(ConnectionStage::Established(connection)) = opt_connection { if let Some(ConnectionStage::Established(connection)) = opt_connection {
@ -360,7 +363,7 @@ pub fn send_out_messages(
.deregister(connection.ws.get_mut()) .deregister(connection.ws.get_mut())
.unwrap(); .unwrap();
connections.remove(&meta.socket_worker_slab_index); connections.remove(&meta.socket_worker_poll_token);
}, },
Err(err) => { Err(err) => {
eprint!("{}", err); eprint!("{}", err);