mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-03-31 17:55:36 +00:00
aquatic_ws: move code into Connection impl, so inner can be private
This commit is contained in:
parent
0bcfffb2bd
commit
846e076b4b
3 changed files with 62 additions and 38 deletions
|
|
@ -183,7 +183,51 @@ impl HandshakeMachine {
|
|||
|
||||
pub struct Connection {
|
||||
pub valid_until: ValidUntil,
|
||||
pub inner: Either<EstablishedWs, HandshakeMachine>,
|
||||
inner: Either<EstablishedWs, HandshakeMachine>,
|
||||
}
|
||||
|
||||
|
||||
impl Connection {
|
||||
pub fn new(
|
||||
valid_until: ValidUntil,
|
||||
inner: Either<EstablishedWs, HandshakeMachine>
|
||||
) -> Self {
|
||||
Self {
|
||||
valid_until,
|
||||
inner
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_established(&self) -> bool {
|
||||
self.inner.is_left()
|
||||
}
|
||||
|
||||
pub fn get_established_ws<'a>(&mut self) -> Option<&mut EstablishedWs> {
|
||||
match self.inner {
|
||||
Either::Left(ref mut ews) => Some(ews),
|
||||
Either::Right(_) => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_machine(self) -> Option<HandshakeMachine> {
|
||||
match self.inner {
|
||||
Either::Left(_) => None,
|
||||
Either::Right(machine) => Some(machine),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn close(&mut self){
|
||||
if let Either::Left(ref mut ews) = self.inner {
|
||||
if ews.ws.can_read(){
|
||||
ews.ws.close(None).unwrap();
|
||||
|
||||
// Needs to be done after ws.close()
|
||||
if let Err(err) = ews.ws.write_pending(){
|
||||
dbg!(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -116,10 +116,10 @@ fn accept_new_streams(
|
|||
.register(&mut stream, token, Interest::READABLE)
|
||||
.unwrap();
|
||||
|
||||
let connection = Connection {
|
||||
let connection = Connection::new(
|
||||
valid_until,
|
||||
inner: Either::Right(HandshakeMachine::new(stream))
|
||||
};
|
||||
Either::Right(HandshakeMachine::new(stream))
|
||||
);
|
||||
|
||||
connections.insert(token, connection);
|
||||
},
|
||||
|
|
@ -147,10 +147,9 @@ pub fn run_handshakes_and_read_messages(
|
|||
valid_until: ValidUntil,
|
||||
){
|
||||
loop {
|
||||
if let Some(Connection {
|
||||
inner: Either::Left(established_ws),
|
||||
..
|
||||
}) = connections.get_mut(&poll_token){
|
||||
if let Some(established_ws) = connections.get_mut(&poll_token)
|
||||
.and_then(Connection::get_established_ws)
|
||||
{
|
||||
use ::tungstenite::Error::Io;
|
||||
|
||||
match established_ws.ws.read_message(){
|
||||
|
|
@ -180,18 +179,14 @@ pub fn run_handshakes_and_read_messages(
|
|||
break;
|
||||
}
|
||||
}
|
||||
} else if let Some(Connection {
|
||||
inner: Either::Right(machine),
|
||||
..
|
||||
}) = connections.remove(&poll_token) {
|
||||
} else if let Some(machine) = connections.remove(&poll_token)
|
||||
.and_then(Connection::get_machine)
|
||||
{
|
||||
let (result, stop_loop) = machine
|
||||
.advance(opt_tls_acceptor);
|
||||
|
||||
if let Some(inner) = result {
|
||||
let connection = Connection {
|
||||
valid_until,
|
||||
inner,
|
||||
};
|
||||
let connection = Connection::new(valid_until, inner);
|
||||
|
||||
connections.insert(poll_token, connection);
|
||||
}
|
||||
|
|
@ -210,12 +205,11 @@ pub fn send_out_messages(
|
|||
connections: &mut ConnectionMap,
|
||||
){
|
||||
for (meta, out_message) in out_message_receiver {
|
||||
let opt_inner = connections
|
||||
.get_mut(&meta.poll_token)
|
||||
.map(|v| &mut v.inner);
|
||||
let opt_established_ws = connections.get_mut(&meta.poll_token)
|
||||
.and_then(Connection::get_established_ws);
|
||||
|
||||
if let Some(Either::Left(connection)) = opt_inner {
|
||||
if connection.peer_addr != meta.peer_addr {
|
||||
if let Some(established_ws) = opt_established_ws {
|
||||
if established_ws.peer_addr != meta.peer_addr {
|
||||
eprintln!("socket worker: peer socket addrs didn't match");
|
||||
|
||||
continue;
|
||||
|
|
@ -225,7 +219,7 @@ pub fn send_out_messages(
|
|||
|
||||
use ::tungstenite::Error::Io;
|
||||
|
||||
match connection.ws.write_message(out_message.to_ws_message()){
|
||||
match established_ws.ws.write_message(out_message.to_ws_message()){
|
||||
Ok(()) => {},
|
||||
Err(Io(err)) if err.kind() == ErrorKind::WouldBlock => {
|
||||
continue;
|
||||
|
|
|
|||
|
|
@ -2,7 +2,6 @@ use std::fs::File;
|
|||
use std::io::Read;
|
||||
use std::time::Instant;
|
||||
|
||||
use either::Either;
|
||||
use mio::Token;
|
||||
use native_tls::{Identity, TlsAcceptor};
|
||||
use net2::{TcpBuilder, unix::UnixTcpBuilderExt};
|
||||
|
|
@ -58,19 +57,6 @@ pub fn create_tls_acceptor(
|
|||
}
|
||||
|
||||
|
||||
pub fn close_connection(connection: &mut Connection){
|
||||
if let Either::Left(ref mut ews) = connection.inner {
|
||||
if ews.ws.can_read(){
|
||||
ews.ws.close(None).unwrap();
|
||||
|
||||
// Needs to be done after ws.close()
|
||||
if let Err(err) = ews.ws.write_pending(){
|
||||
dbg!(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Don't bother with deregistering from Poll. In my understanding, this is
|
||||
/// done automatically when the stream is dropped, as long as there are no
|
||||
|
|
@ -81,7 +67,7 @@ pub fn remove_connection_if_exists(
|
|||
token: Token,
|
||||
){
|
||||
if let Some(mut connection) = connections.remove(&token){
|
||||
close_connection(&mut connection);
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -94,7 +80,7 @@ pub fn remove_inactive_connections(
|
|||
|
||||
connections.retain(|_, connection| {
|
||||
if connection.valid_until.0 < now {
|
||||
close_connection(connection);
|
||||
connection.close();
|
||||
|
||||
println!("closing connection, it is inactive");
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue