mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-04-01 10:15:31 +00:00
aquatic_ws: network: remove inactive connections
This commit is contained in:
parent
87290f4289
commit
9fc9f2fd9c
1 changed files with 63 additions and 46 deletions
|
|
@ -1,7 +1,6 @@
|
||||||
use std::net::{SocketAddr};
|
use std::net::{SocketAddr};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
use std::io::ErrorKind;
|
use std::io::ErrorKind;
|
||||||
use std::option::Option;
|
|
||||||
|
|
||||||
use tungstenite::WebSocket;
|
use tungstenite::WebSocket;
|
||||||
use tungstenite::handshake::{MidHandshake, HandshakeError, server::{ServerHandshake, NoCallback}};
|
use tungstenite::handshake::{MidHandshake, HandshakeError, server::{ServerHandshake, NoCallback}};
|
||||||
|
|
@ -15,7 +14,7 @@ use crate::protocol::*;
|
||||||
|
|
||||||
|
|
||||||
pub struct Connection {
|
pub struct Connection {
|
||||||
valid_until: Option<ValidUntil>,
|
valid_until: ValidUntil,
|
||||||
stage: ConnectionStage,
|
stage: ConnectionStage,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -54,32 +53,78 @@ impl ::tungstenite::handshake::server::Callback for DebugCallback {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Close and remove inactive connections
|
||||||
|
pub fn remove_inactive_connections(
|
||||||
|
poll: &mut Poll,
|
||||||
|
connections: &mut ConnectionMap,
|
||||||
|
){
|
||||||
|
let now = Instant::now();
|
||||||
|
|
||||||
|
connections.retain(|_, connection| {
|
||||||
|
if connection.valid_until.0 < now {
|
||||||
|
match connection.stage {
|
||||||
|
ConnectionStage::Stream(ref mut stream) => {
|
||||||
|
poll.registry()
|
||||||
|
.deregister(stream)
|
||||||
|
.unwrap();
|
||||||
|
},
|
||||||
|
ConnectionStage::MidHandshake(ref mut handshake) => {
|
||||||
|
poll.registry()
|
||||||
|
.deregister(handshake.get_mut().get_mut())
|
||||||
|
.unwrap();
|
||||||
|
},
|
||||||
|
ConnectionStage::Established(ref mut peer_connection) => {
|
||||||
|
peer_connection.ws.close(None).unwrap();
|
||||||
|
|
||||||
|
// Needs to be done after ws.close()
|
||||||
|
if let Err(err) = peer_connection.ws.write_pending(){
|
||||||
|
dbg!(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
poll.registry()
|
||||||
|
.deregister(peer_connection.ws.get_mut())
|
||||||
|
.unwrap();
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("closing connection, it is inactive");
|
||||||
|
|
||||||
|
false
|
||||||
|
} else {
|
||||||
|
println!("keeping connection, it is still active");
|
||||||
|
|
||||||
|
true
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
pub fn run_socket_worker(
|
pub fn run_socket_worker(
|
||||||
address: SocketAddr,
|
address: SocketAddr,
|
||||||
socket_worker_index: usize,
|
socket_worker_index: usize,
|
||||||
in_message_sender: InMessageSender,
|
in_message_sender: InMessageSender,
|
||||||
out_message_receiver: OutMessageReceiver,
|
out_message_receiver: OutMessageReceiver,
|
||||||
){
|
){
|
||||||
|
let poll_timeout = Duration::from_millis(50); // FIXME: config
|
||||||
|
|
||||||
let mut listener = TcpListener::bind(address).unwrap();
|
let mut listener = TcpListener::bind(address).unwrap();
|
||||||
let mut poll = Poll::new().expect("create poll");
|
let mut poll = Poll::new().expect("create poll");
|
||||||
|
let mut events = Events::with_capacity(1024); // FIXME: config
|
||||||
|
|
||||||
poll.registry()
|
poll.registry()
|
||||||
.register(&mut listener, Token(0), Interest::READABLE)
|
.register(&mut listener, Token(0), Interest::READABLE)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let mut events = Events::with_capacity(1024); // FIXME: config
|
|
||||||
|
|
||||||
let timeout = Duration::from_millis(50); // FIXME: config
|
|
||||||
|
|
||||||
let mut connections: ConnectionMap = HashMap::new();
|
let mut connections: ConnectionMap = HashMap::new();
|
||||||
|
|
||||||
let mut poll_token_counter = Token(0usize);
|
let mut poll_token_counter = Token(0usize);
|
||||||
|
let mut iter_counter = 0usize;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
poll.poll(&mut events, Some(timeout))
|
poll.poll(&mut events, Some(poll_timeout))
|
||||||
.expect("failed polling");
|
.expect("failed polling");
|
||||||
|
|
||||||
let valid_until = ValidUntil::new(600);
|
let valid_until = ValidUntil::new(600); // FIXME: config
|
||||||
|
|
||||||
for event in events.iter(){
|
for event in events.iter(){
|
||||||
let token = event.token();
|
let token = event.token();
|
||||||
|
|
@ -104,46 +149,18 @@ pub fn run_socket_worker(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let now = Instant::now();
|
|
||||||
|
|
||||||
// Close connections after some time of inactivity and write pending
|
|
||||||
// messages (which is required after closing anyway.)
|
|
||||||
//
|
|
||||||
// FIXME: peers need to be removed too, wherever they are stored
|
|
||||||
/* connections.retain(|_, opt_connection| {
|
|
||||||
if let Some(connection) = opt_connection {
|
|
||||||
if connection.valid_until.0 < now {
|
|
||||||
connection.ws.close(None).unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
loop {
|
|
||||||
match connection.ws.write_pending(){
|
|
||||||
Err(tungstenite::Error::Io(err)) => {
|
|
||||||
if err.kind() == ErrorKind::WouldBlock {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Err(tungstenite::Error::ConnectionClosed) => {
|
|
||||||
// FIXME: necessary?
|
|
||||||
poll.registry()
|
|
||||||
.deregister(connection.ws.get_mut())
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
return false;
|
|
||||||
},
|
|
||||||
_ => {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
true
|
|
||||||
}); */
|
|
||||||
|
|
||||||
send_out_messages(
|
send_out_messages(
|
||||||
out_message_receiver.drain(),
|
out_message_receiver.drain(),
|
||||||
&mut poll,
|
&mut poll,
|
||||||
&mut connections
|
&mut connections
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Remove inactive connections, but not every iteration
|
||||||
|
if iter_counter % 128 == 0 {
|
||||||
|
remove_inactive_connections(&mut poll, &mut connections);
|
||||||
|
}
|
||||||
|
|
||||||
|
iter_counter = iter_counter.wrapping_add(1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -203,7 +220,7 @@ fn accept_new_streams(
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let connection = Connection {
|
let connection = Connection {
|
||||||
valid_until: Some(valid_until),
|
valid_until,
|
||||||
stage: ConnectionStage::Stream(stream)
|
stage: ConnectionStage::Stream(stream)
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -240,7 +257,7 @@ pub fn handle_handshake_result(
|
||||||
};
|
};
|
||||||
|
|
||||||
let connection = Connection {
|
let connection = Connection {
|
||||||
valid_until: Some(valid_until),
|
valid_until,
|
||||||
stage: ConnectionStage::Established(peer_connection)
|
stage: ConnectionStage::Established(peer_connection)
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -252,7 +269,7 @@ pub fn handle_handshake_result(
|
||||||
println!("interrupted");
|
println!("interrupted");
|
||||||
|
|
||||||
let connection = Connection {
|
let connection = Connection {
|
||||||
valid_until: Some(valid_until),
|
valid_until,
|
||||||
stage: ConnectionStage::MidHandshake(handshake),
|
stage: ConnectionStage::MidHandshake(handshake),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue