mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-04-01 18:25:30 +00:00
aquatic_ws: run_handshakes_and_read_messages: simplify code
This commit is contained in:
parent
7e2f371007
commit
68abecdaa5
1 changed files with 34 additions and 42 deletions
|
|
@ -232,16 +232,41 @@ pub fn run_handshakes_and_read_messages(
|
||||||
valid_until: ValidUntil,
|
valid_until: ValidUntil,
|
||||||
){
|
){
|
||||||
loop {
|
loop {
|
||||||
let established = if let Some(c) = connections.get(&poll_token){
|
if let Some(Connection {
|
||||||
c.stage.is_established()
|
stage: ConnectionStage::EstablishedWs(established_ws),
|
||||||
} else {
|
..
|
||||||
break;
|
}) = connections.get_mut(&poll_token){
|
||||||
};
|
use ::tungstenite::Error::Io;
|
||||||
|
|
||||||
if !established {
|
match established_ws.ws.read_message(){
|
||||||
let conn = connections.remove(&poll_token).unwrap();
|
Ok(ws_message) => {
|
||||||
|
dbg!(ws_message.clone());
|
||||||
|
|
||||||
match conn.stage {
|
if let Some(in_message) = InMessage::from_ws_message(ws_message){
|
||||||
|
dbg!(in_message.clone());
|
||||||
|
|
||||||
|
let meta = ConnectionMeta {
|
||||||
|
worker_index: socket_worker_index,
|
||||||
|
poll_token: poll_token,
|
||||||
|
peer_addr: established_ws.peer_addr
|
||||||
|
};
|
||||||
|
|
||||||
|
in_message_sender.send((meta, in_message));
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(Io(err)) if err.kind() == ErrorKind::WouldBlock => {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
dbg!(err);
|
||||||
|
|
||||||
|
remove_connection_if_exists(connections, poll_token);
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if let Some(connection) = connections.remove(&poll_token) {
|
||||||
|
match connection.stage {
|
||||||
ConnectionStage::TcpStream(stream) => {
|
ConnectionStage::TcpStream(stream) => {
|
||||||
if let Some(tls_acceptor) = opt_tls_acceptor {
|
if let Some(tls_acceptor) = opt_tls_acceptor {
|
||||||
let stop_loop = handle_tls_handshake_result(
|
let stop_loop = handle_tls_handshake_result(
|
||||||
|
|
@ -315,39 +340,6 @@ pub fn run_handshakes_and_read_messages(
|
||||||
},
|
},
|
||||||
ConnectionStage::EstablishedWs(_) => unreachable!(),
|
ConnectionStage::EstablishedWs(_) => unreachable!(),
|
||||||
}
|
}
|
||||||
} else if let Some(Connection {
|
|
||||||
stage: ConnectionStage::EstablishedWs(established_ws),
|
|
||||||
..
|
|
||||||
}) = connections.get_mut(&poll_token){
|
|
||||||
use ::tungstenite::Error::Io;
|
|
||||||
|
|
||||||
match established_ws.ws.read_message(){
|
|
||||||
Ok(ws_message) => {
|
|
||||||
dbg!(ws_message.clone());
|
|
||||||
|
|
||||||
if let Some(in_message) = InMessage::from_ws_message(ws_message){
|
|
||||||
dbg!(in_message.clone());
|
|
||||||
|
|
||||||
let meta = ConnectionMeta {
|
|
||||||
worker_index: socket_worker_index,
|
|
||||||
poll_token: poll_token,
|
|
||||||
peer_addr: established_ws.peer_addr
|
|
||||||
};
|
|
||||||
|
|
||||||
in_message_sender.send((meta, in_message));
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Err(Io(err)) if err.kind() == ErrorKind::WouldBlock => {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
dbg!(err);
|
|
||||||
|
|
||||||
remove_connection_if_exists(connections, poll_token);
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue