WIP: aquatic_ws: network: improve connection map & token code

This commit is contained in:
Joakim Frostegård 2020-05-11 13:45:17 +02:00
parent 003e5f2df9
commit a85a72ff66

View file

@ -3,10 +3,9 @@ use std::time::{Duration, Instant};
use std::io::ErrorKind; use std::io::ErrorKind;
use std::option::Option; use std::option::Option;
use slab::Slab;
use tungstenite::WebSocket; use tungstenite::WebSocket;
use tungstenite::handshake::{MidHandshake, HandshakeError, server::{ServerHandshake, NoCallback}}; use tungstenite::handshake::{MidHandshake, HandshakeError, server::{ServerHandshake, NoCallback}};
use indexmap::IndexMap; use hashbrown::HashMap;
use mio::{Events, Poll, Interest, Token}; use mio::{Events, Poll, Interest, Token};
use mio::net::{TcpListener, TcpStream}; use mio::net::{TcpListener, TcpStream};
@ -25,7 +24,6 @@ pub enum ConnectionStage {
Stream(TcpStream), Stream(TcpStream),
MidHandshake(MidHandshake<ServerHandshake<TcpStream, DebugCallback>>), MidHandshake(MidHandshake<ServerHandshake<TcpStream, DebugCallback>>),
Established(PeerConnection), Established(PeerConnection),
Placeholder
} }
@ -53,15 +51,9 @@ pub fn run_socket_worker(
let timeout = Duration::from_millis(50); // FIXME: config let timeout = Duration::from_millis(50); // FIXME: config
let mut connections: IndexMap<usize, Connection> = IndexMap::new(); let mut connections: HashMap<usize, Connection> = HashMap::new();
let placeholder = Connection { let mut poll_token_counter = 1usize;
valid_until: None,
stage: ConnectionStage::Placeholder,
};
// Insert empty first entry to prevent assignment of index 0
assert_eq!(connections.insert_full(0, placeholder).0, 0);
loop { loop {
poll.poll(&mut events, Some(timeout)) poll.poll(&mut events, Some(timeout))
@ -77,7 +69,8 @@ pub fn run_socket_worker(
&mut listener, &mut listener,
&mut poll, &mut poll,
&mut connections, &mut connections,
valid_until valid_until,
&mut poll_token_counter
); );
} else if event.is_readable(){ } else if event.is_readable(){
read_and_forward_in_messages( read_and_forward_in_messages(
@ -138,13 +131,20 @@ pub fn run_socket_worker(
fn accept_new_streams( fn accept_new_streams(
listener: &mut TcpListener, listener: &mut TcpListener,
poll: &mut Poll, poll: &mut Poll,
connections: &mut IndexMap<usize, Connection>, connections: &mut HashMap<usize, Connection>,
valid_until: ValidUntil, valid_until: ValidUntil,
poll_token_counter: &mut usize,
){ ){
loop { loop {
match listener.accept(){ match listener.accept(){
Ok((mut stream, src)) => { Ok((mut stream, _)) => {
let token = Token(connections.len()); *poll_token_counter = poll_token_counter.wrapping_add(1);
if *poll_token_counter == 0 {
*poll_token_counter = 1;
}
let token = Token(*poll_token_counter);
poll.registry() poll.registry()
.register(&mut stream, token, Interest::READABLE) .register(&mut stream, token, Interest::READABLE)
@ -189,18 +189,17 @@ pub fn read_and_forward_in_messages(
socket_worker_index: usize, socket_worker_index: usize,
in_message_sender: &InMessageSender, in_message_sender: &InMessageSender,
poll: &mut Poll, poll: &mut Poll,
connections: &mut IndexMap<usize, Connection>, connections: &mut HashMap<usize, Connection>,
poll_token: Token, poll_token: Token,
valid_until: ValidUntil, valid_until: ValidUntil,
){ ){
println!("poll_token: {}", poll_token.0); println!("poll_token: {}", poll_token.0);
loop { loop {
let established = match connections.get_index(poll_token.0).map(|(_, v)| &v.stage){ let established = match connections.get(&poll_token.0).map(|c| &c.stage){
Some(ConnectionStage::Stream(_)) => false, Some(ConnectionStage::Stream(_)) => false,
Some(ConnectionStage::MidHandshake(_)) => false, Some(ConnectionStage::MidHandshake(_)) => false,
Some(ConnectionStage::Established(_)) => true, Some(ConnectionStage::Established(_)) => true,
Some(ConnectionStage::Placeholder) => unreachable!(),
None => break, None => break,
}; };
@ -329,7 +328,7 @@ pub fn read_and_forward_in_messages(
pub fn send_out_messages( pub fn send_out_messages(
out_message_receiver: ::flume::Drain<(ConnectionMeta, OutMessage)>, out_message_receiver: ::flume::Drain<(ConnectionMeta, OutMessage)>,
poll: &mut Poll, poll: &mut Poll,
connections: &mut IndexMap<usize, Connection>, connections: &mut HashMap<usize, Connection>,
){ ){
// Read messages from channel, send to peers // Read messages from channel, send to peers
for (meta, out_message) in out_message_receiver { for (meta, out_message) in out_message_receiver {