mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-04-02 10:45:30 +00:00
WIP: aquatic_ws; work on simplifying network code
This commit is contained in:
parent
9c15a97975
commit
91590858b9
3 changed files with 131 additions and 210 deletions
|
|
@ -119,10 +119,12 @@ fn accept_new_streams(
|
|||
poll.registry()
|
||||
.register(&mut stream, token, Interest::READABLE)
|
||||
.unwrap();
|
||||
|
||||
let stream = Stream::TcpStream(stream);
|
||||
|
||||
let connection = Connection {
|
||||
valid_until,
|
||||
stage: ConnectionStage::TcpStream(stream)
|
||||
stage: ConnectionStage::Stream(stream)
|
||||
};
|
||||
|
||||
connections.insert(token, connection);
|
||||
|
|
@ -139,94 +141,7 @@ fn accept_new_streams(
|
|||
}
|
||||
|
||||
|
||||
pub fn handle_tls_handshake_result(
|
||||
connections: &mut ConnectionMap,
|
||||
poll_token: Token,
|
||||
valid_until: ValidUntil,
|
||||
result: Result<TlsStream<TcpStream>, native_tls::HandshakeError<TcpStream>>,
|
||||
) -> bool {
|
||||
match result {
|
||||
Ok(stream) => {
|
||||
println!("handshake established");
|
||||
|
||||
let connection = Connection {
|
||||
valid_until,
|
||||
stage: ConnectionStage::TlsStream(stream)
|
||||
};
|
||||
|
||||
connections.insert(poll_token, connection);
|
||||
|
||||
false
|
||||
},
|
||||
Err(native_tls::HandshakeError::WouldBlock(handshake)) => {
|
||||
println!("interrupted");
|
||||
|
||||
let connection = Connection {
|
||||
valid_until,
|
||||
stage: ConnectionStage::TlsMidHandshake(handshake),
|
||||
};
|
||||
|
||||
connections.insert(poll_token, connection);
|
||||
|
||||
true
|
||||
},
|
||||
Err(native_tls::HandshakeError::Failure(err)) => {
|
||||
dbg!(err);
|
||||
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
pub fn handle_ws_handshake_no_tls_result(
|
||||
connections: &mut ConnectionMap,
|
||||
poll_token: Token,
|
||||
valid_until: ValidUntil,
|
||||
result: Result<WebSocket<TcpStream>, HandshakeError<ServerHandshake<TcpStream, DebugCallback>>> ,
|
||||
) -> bool {
|
||||
match result {
|
||||
Ok(mut ws) => {
|
||||
println!("handshake established");
|
||||
|
||||
let peer_addr = ws.get_mut().peer_addr().unwrap();
|
||||
|
||||
let established_ws = EstablishedWs {
|
||||
ws,
|
||||
peer_addr,
|
||||
};
|
||||
|
||||
let connection = Connection {
|
||||
valid_until,
|
||||
stage: ConnectionStage::EstablishedWsNoTls(established_ws)
|
||||
};
|
||||
|
||||
connections.insert(poll_token, connection);
|
||||
|
||||
false
|
||||
},
|
||||
Err(HandshakeError::Interrupted(handshake)) => {
|
||||
println!("interrupted");
|
||||
|
||||
let connection = Connection {
|
||||
valid_until,
|
||||
stage: ConnectionStage::WsHandshakeNoTls(handshake),
|
||||
};
|
||||
|
||||
connections.insert(poll_token, connection);
|
||||
|
||||
true
|
||||
},
|
||||
Err(HandshakeError::Failure(err)) => {
|
||||
dbg!(err);
|
||||
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
pub fn handle_ws_handshake_tls_result(
|
||||
pub fn handle_ws_handshake_result(
|
||||
connections: &mut ConnectionMap,
|
||||
poll_token: Token,
|
||||
valid_until: ValidUntil,
|
||||
|
|
@ -236,7 +151,7 @@ pub fn handle_ws_handshake_tls_result(
|
|||
Ok(mut ws) => {
|
||||
println!("handshake established");
|
||||
|
||||
let peer_addr = ws.get_mut().get_mut().peer_addr().unwrap();
|
||||
let peer_addr = ws.get_mut().get_peer_addr();
|
||||
|
||||
let established_ws = EstablishedWs {
|
||||
ws,
|
||||
|
|
@ -245,7 +160,7 @@ pub fn handle_ws_handshake_tls_result(
|
|||
|
||||
let connection = Connection {
|
||||
valid_until,
|
||||
stage: ConnectionStage::EstablishedWsTls(established_ws)
|
||||
stage: ConnectionStage::EstablishedWs(established_ws)
|
||||
};
|
||||
|
||||
connections.insert(poll_token, connection);
|
||||
|
|
@ -257,7 +172,7 @@ pub fn handle_ws_handshake_tls_result(
|
|||
|
||||
let connection = Connection {
|
||||
valid_until,
|
||||
stage: ConnectionStage::WsHandshakeTls(handshake),
|
||||
stage: ConnectionStage::WsHandshake(handshake),
|
||||
};
|
||||
|
||||
connections.insert(poll_token, connection);
|
||||
|
|
@ -353,25 +268,42 @@ pub fn run_handshakes_and_read_messages(
|
|||
let conn = connections.remove(&poll_token).unwrap();
|
||||
|
||||
match conn.stage {
|
||||
ConnectionStage::TcpStream(stream) => {
|
||||
ConnectionStage::Stream(Stream::TcpStream(stream)) => {
|
||||
if let Some(tls_acceptor) = opt_tls_acceptor {
|
||||
let stop_loop = handle_tls_handshake_result(
|
||||
connections,
|
||||
poll_token,
|
||||
valid_until,
|
||||
tls_acceptor.accept(stream)
|
||||
);
|
||||
|
||||
if stop_loop {
|
||||
break;
|
||||
match tls_acceptor.accept(stream){
|
||||
Ok(stream) => {
|
||||
println!("handshake established");
|
||||
|
||||
let connection = Connection {
|
||||
valid_until,
|
||||
stage: ConnectionStage::Stream(Stream::TlsStream(stream))
|
||||
};
|
||||
|
||||
connections.insert(poll_token, connection);
|
||||
},
|
||||
Err(native_tls::HandshakeError::WouldBlock(handshake)) => {
|
||||
println!("interrupted");
|
||||
|
||||
let connection = Connection {
|
||||
valid_until,
|
||||
stage: ConnectionStage::TlsMidHandshake(handshake),
|
||||
};
|
||||
|
||||
connections.insert(poll_token, connection);
|
||||
|
||||
break
|
||||
},
|
||||
Err(native_tls::HandshakeError::Failure(err)) => {
|
||||
dbg!(err);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
let handshake_result = ::tungstenite::server::accept_hdr(
|
||||
stream,
|
||||
Stream::TcpStream(stream),
|
||||
DebugCallback
|
||||
);
|
||||
|
||||
let stop_loop = handle_ws_handshake_no_tls_result(
|
||||
let stop_loop = handle_ws_handshake_result(
|
||||
connections,
|
||||
poll_token,
|
||||
valid_until,
|
||||
|
|
@ -383,25 +315,13 @@ pub fn run_handshakes_and_read_messages(
|
|||
}
|
||||
}
|
||||
},
|
||||
ConnectionStage::TlsMidHandshake(handshake) => {
|
||||
let stop_loop = handle_tls_handshake_result(
|
||||
connections,
|
||||
poll_token,
|
||||
valid_until,
|
||||
handshake.handshake()
|
||||
);
|
||||
|
||||
if stop_loop {
|
||||
break;
|
||||
}
|
||||
},
|
||||
ConnectionStage::TlsStream(stream) => {
|
||||
ConnectionStage::Stream(Stream::TlsStream(stream)) => {
|
||||
let handshake_result = ::tungstenite::server::accept_hdr(
|
||||
stream,
|
||||
Stream::TlsStream(stream),
|
||||
DebugCallback
|
||||
);
|
||||
|
||||
let stop_loop = handle_ws_handshake_tls_result(
|
||||
let stop_loop = handle_ws_handshake_result(
|
||||
connections,
|
||||
poll_token,
|
||||
valid_until,
|
||||
|
|
@ -412,8 +332,37 @@ pub fn run_handshakes_and_read_messages(
|
|||
break;
|
||||
}
|
||||
},
|
||||
ConnectionStage::WsHandshakeNoTls(handshake) => {
|
||||
let stop_loop = handle_ws_handshake_no_tls_result(
|
||||
ConnectionStage::TlsMidHandshake(handshake) => {
|
||||
match handshake.handshake() {
|
||||
Ok(stream) => {
|
||||
println!("handshake established");
|
||||
|
||||
let connection = Connection {
|
||||
valid_until,
|
||||
stage: ConnectionStage::Stream(Stream::TlsStream(stream))
|
||||
};
|
||||
|
||||
connections.insert(poll_token, connection);
|
||||
},
|
||||
Err(native_tls::HandshakeError::WouldBlock(handshake)) => {
|
||||
println!("interrupted");
|
||||
|
||||
let connection = Connection {
|
||||
valid_until,
|
||||
stage: ConnectionStage::TlsMidHandshake(handshake),
|
||||
};
|
||||
|
||||
connections.insert(poll_token, connection);
|
||||
|
||||
break
|
||||
},
|
||||
Err(native_tls::HandshakeError::Failure(err)) => {
|
||||
dbg!(err);
|
||||
}
|
||||
}
|
||||
},
|
||||
ConnectionStage::WsHandshake(handshake) => {
|
||||
let stop_loop = handle_ws_handshake_result(
|
||||
connections,
|
||||
poll_token,
|
||||
valid_until,
|
||||
|
|
@ -424,38 +373,12 @@ pub fn run_handshakes_and_read_messages(
|
|||
break;
|
||||
}
|
||||
},
|
||||
ConnectionStage::WsHandshakeTls(handshake) => {
|
||||
let stop_loop = handle_ws_handshake_tls_result(
|
||||
connections,
|
||||
poll_token,
|
||||
valid_until,
|
||||
handshake.handshake()
|
||||
);
|
||||
|
||||
if stop_loop {
|
||||
break;
|
||||
}
|
||||
},
|
||||
ConnectionStage::EstablishedWsNoTls(_) => unreachable!(),
|
||||
ConnectionStage::EstablishedWsTls(_) => unreachable!(),
|
||||
ConnectionStage::EstablishedWs(_) => unreachable!(),
|
||||
}
|
||||
} else {
|
||||
match connections.get_mut(&poll_token){
|
||||
Some(Connection{
|
||||
stage: ConnectionStage::EstablishedWsNoTls(established_ws),
|
||||
..
|
||||
}) => {
|
||||
read_ws_messages!(
|
||||
socket_worker_index,
|
||||
in_message_sender,
|
||||
poll,
|
||||
connections,
|
||||
poll_token,
|
||||
established_ws
|
||||
);
|
||||
},
|
||||
Some(Connection{
|
||||
stage: ConnectionStage::EstablishedWsTls(established_ws),
|
||||
stage: ConnectionStage::EstablishedWs(established_ws),
|
||||
..
|
||||
}) => {
|
||||
read_ws_messages!(
|
||||
|
|
@ -489,32 +412,7 @@ pub fn send_out_messages(
|
|||
|
||||
// Exactly the same for both established stages
|
||||
match opt_stage {
|
||||
Some(ConnectionStage::EstablishedWsNoTls(connection)) => {
|
||||
if connection.peer_addr != meta.peer_addr {
|
||||
eprintln!("socket worker: peer socket addrs didn't match");
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
dbg!(out_message.clone());
|
||||
|
||||
match connection.ws.write_message(out_message.to_ws_message()){
|
||||
Ok(()) => {},
|
||||
Err(Io(err)) if err.kind() == ErrorKind::WouldBlock => {
|
||||
continue;
|
||||
},
|
||||
Err(err) => {
|
||||
dbg!(err);
|
||||
|
||||
remove_connection_if_exists(
|
||||
poll,
|
||||
connections,
|
||||
meta.poll_token
|
||||
);
|
||||
},
|
||||
}
|
||||
},
|
||||
Some(ConnectionStage::EstablishedWsTls(connection)) => {
|
||||
Some(ConnectionStage::EstablishedWs(connection)) => {
|
||||
if connection.peer_addr != meta.peer_addr {
|
||||
eprintln!("socket worker: peer socket addrs didn't match");
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue