WIP: aquatic_ws: network: rename PeerConnection to EstablishedWs

This commit is contained in:
Joakim Frostegård 2020-05-12 20:31:07 +02:00
parent b221f3fc34
commit 6e0b0ccbc2
2 changed files with 38 additions and 44 deletions

View file

@ -35,7 +35,7 @@ pub fn run(config: Config){
i, i,
in_message_sender, in_message_sender,
out_message_receiver, out_message_receiver,
true false
); );
}); });
} }

View file

@ -21,10 +21,9 @@ use crate::protocol::*;
pub type Stream = TlsStream<TcpStream>; pub type Stream = TlsStream<TcpStream>;
pub struct PeerConnection<S> { pub struct EstablishedWs<S> {
pub ws: WebSocket<S>, pub ws: WebSocket<S>,
pub peer_addr: SocketAddr, pub peer_addr: SocketAddr,
pub valid_until: ValidUntil,
} }
@ -34,8 +33,8 @@ pub enum ConnectionStage {
TlsStream(Stream), TlsStream(Stream),
WsHandshakeNoTls(MidHandshake<ServerHandshake<TcpStream, DebugCallback>>), WsHandshakeNoTls(MidHandshake<ServerHandshake<TcpStream, DebugCallback>>),
WsHandshakeTls(MidHandshake<ServerHandshake<Stream, DebugCallback>>), WsHandshakeTls(MidHandshake<ServerHandshake<Stream, DebugCallback>>),
EstablishedNoTls(PeerConnection<TcpStream>), EstablishedWsNoTls(EstablishedWs<TcpStream>),
EstablishedTls(PeerConnection<Stream>), EstablishedWsTls(EstablishedWs<Stream>),
} }
@ -95,32 +94,32 @@ fn close_and_deregister_connection(
.deregister(handshake.get_mut().get_mut().get_mut()) .deregister(handshake.get_mut().get_mut().get_mut())
.unwrap(); .unwrap();
}, },
ConnectionStage::EstablishedNoTls(ref mut peer_connection) => { ConnectionStage::EstablishedWsNoTls(ref mut established_ws) => {
if peer_connection.ws.can_read(){ if established_ws.ws.can_read(){
peer_connection.ws.close(None).unwrap(); established_ws.ws.close(None).unwrap();
// Needs to be done after ws.close() // Needs to be done after ws.close()
if let Err(err) = peer_connection.ws.write_pending(){ if let Err(err) = established_ws.ws.write_pending(){
dbg!(err); dbg!(err);
} }
} }
poll.registry() poll.registry()
.deregister(peer_connection.ws.get_mut()) .deregister(established_ws.ws.get_mut())
.unwrap(); .unwrap();
}, },
ConnectionStage::EstablishedTls(ref mut peer_connection) => { ConnectionStage::EstablishedWsTls(ref mut established_ws) => {
if peer_connection.ws.can_read(){ if established_ws.ws.can_read(){
peer_connection.ws.close(None).unwrap(); established_ws.ws.close(None).unwrap();
// Needs to be done after ws.close() // Needs to be done after ws.close()
if let Err(err) = peer_connection.ws.write_pending(){ if let Err(err) = established_ws.ws.write_pending(){
dbg!(err); dbg!(err);
} }
} }
poll.registry() poll.registry()
.deregister(peer_connection.ws.get_mut().get_mut()) .deregister(established_ws.ws.get_mut().get_mut())
.unwrap(); .unwrap();
}, },
} }
@ -230,7 +229,11 @@ pub fn run_socket_worker(
.register(&mut listener, Token(0), Interest::READABLE) .register(&mut listener, Token(0), Interest::READABLE)
.unwrap(); .unwrap();
let tls_acceptor = create_tls_acceptor(&config); let opt_tls_acceptor = if use_tls {
Some(create_tls_acceptor(&config))
} else {
None
};
let mut connections: ConnectionMap = HashMap::new(); let mut connections: ConnectionMap = HashMap::new();
@ -258,12 +261,11 @@ pub fn run_socket_worker(
run_handshakes_and_read_messages( run_handshakes_and_read_messages(
socket_worker_index, socket_worker_index,
&in_message_sender, &in_message_sender,
&tls_acceptor, &opt_tls_acceptor,
&mut poll, &mut poll,
&mut connections, &mut connections,
token, token,
valid_until, valid_until,
use_tls
); );
} }
} }
@ -380,15 +382,14 @@ pub fn handle_ws_handshake_no_tls_result(
let peer_addr = ws.get_mut().peer_addr().unwrap(); let peer_addr = ws.get_mut().peer_addr().unwrap();
let peer_connection = PeerConnection { let established_ws = EstablishedWs {
ws, ws,
peer_addr, peer_addr,
valid_until,
}; };
let connection = Connection { let connection = Connection {
valid_until, valid_until,
stage: ConnectionStage::EstablishedNoTls(peer_connection) stage: ConnectionStage::EstablishedWsNoTls(established_ws)
}; };
connections.insert(poll_token, connection); connections.insert(poll_token, connection);
@ -428,15 +429,14 @@ pub fn handle_ws_handshake_tls_result(
let peer_addr = ws.get_mut().get_mut().peer_addr().unwrap(); let peer_addr = ws.get_mut().get_mut().peer_addr().unwrap();
let peer_connection = PeerConnection { let established_ws = EstablishedWs {
ws, ws,
peer_addr, peer_addr,
valid_until,
}; };
let connection = Connection { let connection = Connection {
valid_until, valid_until,
stage: ConnectionStage::EstablishedTls(peer_connection) stage: ConnectionStage::EstablishedWsTls(established_ws)
}; };
connections.insert(poll_token, connection); connections.insert(poll_token, connection);
@ -473,12 +473,11 @@ macro_rules! read_ws_messages {
$poll: ident, $poll: ident,
$connections: ident, $connections: ident,
$poll_token: ident, $poll_token: ident,
$valid_until: ident, $established_ws: ident
$peer_connection: ident
) => { ) => {
println!("conn established"); println!("conn established");
match $peer_connection.ws.read_message(){ match $established_ws.ws.read_message(){
Ok(ws_message) => { Ok(ws_message) => {
dbg!(ws_message.clone()); dbg!(ws_message.clone());
@ -488,13 +487,11 @@ macro_rules! read_ws_messages {
let meta = ConnectionMeta { let meta = ConnectionMeta {
worker_index: $socket_worker_index, worker_index: $socket_worker_index,
poll_token: $poll_token, poll_token: $poll_token,
peer_addr: $peer_connection.peer_addr peer_addr: $established_ws.peer_addr
}; };
$in_message_sender.send((meta, in_message)); $in_message_sender.send((meta, in_message));
} }
$peer_connection.valid_until = $valid_until;
}, },
Err(tungstenite::Error::Io(err)) => { Err(tungstenite::Error::Io(err)) => {
if err.kind() == ErrorKind::WouldBlock { if err.kind() == ErrorKind::WouldBlock {
@ -527,19 +524,18 @@ macro_rules! read_ws_messages {
pub fn run_handshakes_and_read_messages( pub fn run_handshakes_and_read_messages(
socket_worker_index: usize, socket_worker_index: usize,
in_message_sender: &InMessageSender, in_message_sender: &InMessageSender,
tls_acceptor: &TlsAcceptor, opt_tls_acceptor: &Option<TlsAcceptor>, // If set, run TLS
poll: &mut Poll, poll: &mut Poll,
connections: &mut ConnectionMap, connections: &mut ConnectionMap,
poll_token: Token, poll_token: Token,
valid_until: ValidUntil, valid_until: ValidUntil,
use_tls: bool
){ ){
println!("poll_token: {}", poll_token.0); println!("poll_token: {}", poll_token.0);
loop { loop {
let established = match connections.get(&poll_token).map(|c| &c.stage){ let established = match connections.get(&poll_token).map(|c| &c.stage){
Some(ConnectionStage::EstablishedTls(_)) => true, Some(ConnectionStage::EstablishedWsTls(_)) => true,
Some(ConnectionStage::EstablishedNoTls(_)) => true, Some(ConnectionStage::EstablishedWsNoTls(_)) => true,
Some(_) => false, Some(_) => false,
None => break, None => break,
}; };
@ -549,7 +545,7 @@ pub fn run_handshakes_and_read_messages(
match conn.stage { match conn.stage {
ConnectionStage::TcpStream(stream) => { ConnectionStage::TcpStream(stream) => {
if use_tls { if let Some(tls_acceptor) = opt_tls_acceptor {
let stop_loop = handle_tls_handshake_result( let stop_loop = handle_tls_handshake_result(
connections, connections,
poll_token, poll_token,
@ -631,13 +627,13 @@ pub fn run_handshakes_and_read_messages(
break; break;
} }
}, },
ConnectionStage::EstablishedNoTls(_) => unreachable!(), ConnectionStage::EstablishedWsNoTls(_) => unreachable!(),
ConnectionStage::EstablishedTls(_) => unreachable!(), ConnectionStage::EstablishedWsTls(_) => unreachable!(),
} }
} else { } else {
match connections.get_mut(&poll_token){ match connections.get_mut(&poll_token){
Some(Connection{ Some(Connection{
stage: ConnectionStage::EstablishedNoTls(peer_connection), stage: ConnectionStage::EstablishedWsNoTls(established_ws),
.. ..
}) => { }) => {
read_ws_messages!( read_ws_messages!(
@ -646,12 +642,11 @@ pub fn run_handshakes_and_read_messages(
poll, poll,
connections, connections,
poll_token, poll_token,
valid_until, established_ws
peer_connection
); );
}, },
Some(Connection{ Some(Connection{
stage: ConnectionStage::EstablishedTls(peer_connection), stage: ConnectionStage::EstablishedWsTls(established_ws),
.. ..
}) => { }) => {
read_ws_messages!( read_ws_messages!(
@ -660,8 +655,7 @@ pub fn run_handshakes_and_read_messages(
poll, poll,
connections, connections,
poll_token, poll_token,
valid_until, established_ws
peer_connection
); );
}, },
_ => () _ => ()
@ -682,7 +676,7 @@ pub fn send_out_messages(
.get_mut(&meta.poll_token) .get_mut(&meta.poll_token)
.map(|v| &mut v.stage); .map(|v| &mut v.stage);
if let Some(ConnectionStage::EstablishedTls(connection)) = opt_connection { if let Some(ConnectionStage::EstablishedWsTls(connection)) = opt_connection {
if connection.peer_addr != meta.peer_addr { if connection.peer_addr != meta.peer_addr {
eprintln!("socket worker: peer socket addrs didn't match"); eprintln!("socket worker: peer socket addrs didn't match");