mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-04-01 10:15:31 +00:00
aquatic ws: split network code into separate functions
This commit is contained in:
parent
5b58db90e3
commit
e6a4bca04c
1 changed files with 147 additions and 100 deletions
|
|
@ -16,7 +16,7 @@ use crate::protocol::*;
|
||||||
pub struct PeerConnection {
|
pub struct PeerConnection {
|
||||||
pub ws: WebSocket<TcpStream>,
|
pub ws: WebSocket<TcpStream>,
|
||||||
pub peer_socket_addr: SocketAddr,
|
pub peer_socket_addr: SocketAddr,
|
||||||
pub valid_until: Instant,
|
pub valid_until: ValidUntil,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -33,9 +33,9 @@ pub fn run_socket_worker(
|
||||||
.register(&mut listener, Token(0), Interest::READABLE)
|
.register(&mut listener, Token(0), Interest::READABLE)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let mut events = Events::with_capacity(1024);
|
let mut events = Events::with_capacity(1024); // FIXME: config
|
||||||
|
|
||||||
let timeout = Duration::from_millis(50);
|
let timeout = Duration::from_millis(50); // FIXME: config
|
||||||
|
|
||||||
let mut connections: Slab<Option<PeerConnection>> = Slab::new();
|
let mut connections: Slab<Option<PeerConnection>> = Slab::new();
|
||||||
|
|
||||||
|
|
@ -46,12 +46,80 @@ pub fn run_socket_worker(
|
||||||
poll.poll(&mut events, Some(timeout))
|
poll.poll(&mut events, Some(timeout))
|
||||||
.expect("failed polling");
|
.expect("failed polling");
|
||||||
|
|
||||||
let valid_until = Instant::now() + Duration::from_secs(600);
|
let valid_until = ValidUntil::new(600);
|
||||||
|
|
||||||
for event in events.iter(){
|
for event in events.iter(){
|
||||||
let token = event.token();
|
let token = event.token();
|
||||||
|
|
||||||
if token.0 == 0 {
|
if token.0 == 0 {
|
||||||
|
accept_new_connections(
|
||||||
|
&mut listener,
|
||||||
|
&mut poll,
|
||||||
|
&mut connections,
|
||||||
|
valid_until
|
||||||
|
);
|
||||||
|
} else if event.is_readable(){
|
||||||
|
read_and_forward_in_messages(
|
||||||
|
socket_worker_index,
|
||||||
|
&in_message_sender,
|
||||||
|
&mut poll,
|
||||||
|
&mut connections,
|
||||||
|
token,
|
||||||
|
valid_until
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let now = Instant::now();
|
||||||
|
|
||||||
|
// Close connections after some time of inactivity and write pending
|
||||||
|
// messages (which is required after closing anyway.)
|
||||||
|
//
|
||||||
|
// FIXME: peers need to be removed too, wherever they are stored
|
||||||
|
connections.retain(|_, opt_connection| {
|
||||||
|
if let Some(connection) = opt_connection {
|
||||||
|
if connection.valid_until.0 < now {
|
||||||
|
connection.ws.close(None).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match connection.ws.write_pending(){
|
||||||
|
Err(tungstenite::Error::Io(err)) => {
|
||||||
|
if err.kind() == ErrorKind::WouldBlock {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(tungstenite::Error::ConnectionClosed) => {
|
||||||
|
// FIXME: necessary?
|
||||||
|
poll.registry()
|
||||||
|
.deregister(connection.ws.get_mut())
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
return false;
|
||||||
|
},
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
true
|
||||||
|
});
|
||||||
|
|
||||||
|
send_out_messages(
|
||||||
|
out_message_receiver.drain(),
|
||||||
|
&mut poll,
|
||||||
|
&mut connections
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
fn accept_new_connections(
|
||||||
|
listener: &mut TcpListener,
|
||||||
|
poll: &mut Poll,
|
||||||
|
connections: &mut Slab<Option<PeerConnection>>,
|
||||||
|
valid_until: ValidUntil,
|
||||||
|
){
|
||||||
loop {
|
loop {
|
||||||
match listener.accept(){
|
match listener.accept(){
|
||||||
Ok((mut stream, src)) => {
|
Ok((mut stream, src)) => {
|
||||||
|
|
@ -84,15 +152,25 @@ pub fn run_socket_worker(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if event.is_readable(){
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pub fn read_and_forward_in_messages(
|
||||||
|
socket_worker_index: usize,
|
||||||
|
in_message_sender: &InMessageSender,
|
||||||
|
poll: &mut Poll,
|
||||||
|
connections: &mut Slab<Option<PeerConnection>>,
|
||||||
|
poll_token: Token,
|
||||||
|
valid_until: ValidUntil,
|
||||||
|
){
|
||||||
loop {
|
loop {
|
||||||
if let Some(Some(connection)) = connections.get_mut(token.0){
|
if let Some(Some(connection)) = connections.get_mut(poll_token.0){
|
||||||
match connection.ws.read_message(){
|
match connection.ws.read_message(){
|
||||||
Ok(ws_message) => {
|
Ok(ws_message) => {
|
||||||
if let Some(in_message) = InMessage::from_ws_message(ws_message){
|
if let Some(in_message) = InMessage::from_ws_message(ws_message){
|
||||||
let meta = ConnectionMeta {
|
let meta = ConnectionMeta {
|
||||||
socket_worker_index,
|
socket_worker_index,
|
||||||
socket_worker_slab_index: token.0,
|
socket_worker_slab_index: poll_token.0,
|
||||||
peer_socket_addr: connection.peer_socket_addr
|
peer_socket_addr: connection.peer_socket_addr
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -114,7 +192,7 @@ pub fn run_socket_worker(
|
||||||
.deregister(connection.ws.get_mut())
|
.deregister(connection.ws.get_mut())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
connections.remove(token.0);
|
connections.remove(poll_token.0);
|
||||||
},
|
},
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
eprint!("{}", err);
|
eprint!("{}", err);
|
||||||
|
|
@ -122,46 +200,16 @@ pub fn run_socket_worker(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
let now = Instant::now();
|
|
||||||
|
|
||||||
// Close connections after some time of inactivity and write pending
|
|
||||||
// messages (which is required after closing anyway.)
|
|
||||||
//
|
|
||||||
// FIXME: peers need to be removed too, wherever they are stored
|
|
||||||
connections.retain(|_, opt_connection| {
|
|
||||||
if let Some(connection) = opt_connection {
|
|
||||||
if connection.valid_until < now {
|
|
||||||
connection.ws.close(None).unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
loop {
|
|
||||||
match connection.ws.write_pending(){
|
|
||||||
Err(tungstenite::Error::Io(err)) => {
|
|
||||||
if err.kind() == ErrorKind::WouldBlock {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Err(tungstenite::Error::ConnectionClosed) => {
|
|
||||||
// FIXME: necessary?
|
|
||||||
poll.registry()
|
|
||||||
.deregister(connection.ws.get_mut())
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
return false;
|
|
||||||
},
|
|
||||||
_ => {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
true
|
|
||||||
});
|
|
||||||
|
|
||||||
|
pub fn send_out_messages(
|
||||||
|
out_message_receiver: ::flume::Drain<(ConnectionMeta, OutMessage)>,
|
||||||
|
poll: &mut Poll,
|
||||||
|
connections: &mut Slab<Option<PeerConnection>>,
|
||||||
|
){
|
||||||
// Read messages from channel, send to peers
|
// Read messages from channel, send to peers
|
||||||
for (meta, out_message) in out_message_receiver.drain(){
|
for (meta, out_message) in out_message_receiver {
|
||||||
let opt_connection = connections
|
let opt_connection = connections
|
||||||
.get_mut(meta.socket_worker_slab_index);
|
.get_mut(meta.socket_worker_slab_index);
|
||||||
|
|
||||||
|
|
@ -195,5 +243,4 @@ pub fn run_socket_worker(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue