aquatic_http: deregister connections before removing them

This commit is contained in:
Joakim Frostegård 2020-08-09 02:30:40 +02:00
parent 1b2b4f0eb5
commit d9282defbc
2 changed files with 59 additions and 8 deletions

View file

@ -4,7 +4,7 @@ use std::io::{Read, Write};
use std::sync::Arc; use std::sync::Arc;
use hashbrown::HashMap; use hashbrown::HashMap;
use mio::Token; use mio::{Token, Poll};
use mio::net::TcpStream; use mio::net::TcpStream;
use native_tls::{TlsAcceptor, MidHandshakeTlsStream}; use native_tls::{TlsAcceptor, MidHandshakeTlsStream};
@ -287,6 +287,31 @@ impl Connection {
None None
} }
} }
pub fn deregister(&mut self, poll: &mut Poll) -> ::std::io::Result<()> {
match &mut self.inner {
ConnectionInner::Established(established) => {
match &mut established.stream {
Stream::TcpStream(ref mut stream) => {
poll.registry().deregister(stream)
},
Stream::TlsStream(ref mut stream) => {
poll.registry().deregister(stream.get_mut())
},
}
},
ConnectionInner::InProgress(TlsHandshakeMachine { inner, ..}) => {
match inner {
TlsHandshakeMachineInner::TcpStream(ref mut stream) => {
poll.registry().deregister(stream)
},
TlsHandshakeMachineInner::TlsMidHandshake(ref mut mid_handshake) => {
poll.registry().deregister(mid_handshake.get_mut())
},
}
},
}
}
} }

View file

@ -105,6 +105,7 @@ pub fn run_poll_loop(
handle_connection_read_event( handle_connection_read_event(
&config, &config,
socket_worker_index, socket_worker_index,
&mut poll,
&request_channel_sender, &request_channel_sender,
&mut local_responses, &mut local_responses,
&mut connections, &mut connections,
@ -116,6 +117,7 @@ pub fn run_poll_loop(
if !(local_responses.is_empty() & (response_channel_receiver.is_empty())) { if !(local_responses.is_empty() & (response_channel_receiver.is_empty())) {
send_responses( send_responses(
&config, &config,
&mut poll,
&mut response_buffer, &mut response_buffer,
local_responses.drain(..), local_responses.drain(..),
response_channel_receiver.try_iter(), response_channel_receiver.try_iter(),
@ -125,7 +127,7 @@ pub fn run_poll_loop(
// Remove inactive connections, but not every iteration // Remove inactive connections, but not every iteration
if iter_counter % CONNECTION_CLEAN_INTERVAL == 0 { if iter_counter % CONNECTION_CLEAN_INTERVAL == 0 {
remove_inactive_connections(&mut connections); remove_inactive_connections(&mut poll, &mut connections);
} }
iter_counter = iter_counter.wrapping_add(1); iter_counter = iter_counter.wrapping_add(1);
@ -155,7 +157,7 @@ fn accept_new_streams(
let token = *poll_token_counter; let token = *poll_token_counter;
// Remove connection if it exists (which is unlikely) // Remove connection if it exists (which is unlikely)
connections.remove(&token); remove_connection(poll, connections, poll_token_counter);
poll.registry() poll.registry()
.register(&mut stream, token, Interest::READABLE) .register(&mut stream, token, Interest::READABLE)
@ -186,6 +188,7 @@ fn accept_new_streams(
pub fn handle_connection_read_event( pub fn handle_connection_read_event(
config: &Config, config: &Config,
socket_worker_index: usize, socket_worker_index: usize,
poll: &mut Poll,
request_channel_sender: &RequestChannelSender, request_channel_sender: &RequestChannelSender,
local_responses: &mut Vec<(ConnectionMeta, Response)>, local_responses: &mut Vec<(ConnectionMeta, Response)>,
connections: &mut ConnectionMap, connections: &mut ConnectionMap,
@ -256,14 +259,14 @@ pub fn handle_connection_read_event(
Err(RequestReadError::StreamEnded) => { Err(RequestReadError::StreamEnded) => {
::log::debug!("stream ended"); ::log::debug!("stream ended");
connections.remove(&poll_token); remove_connection(poll, connections, &poll_token);
break break
}, },
Err(RequestReadError::Io(err)) => { Err(RequestReadError::Io(err)) => {
::log::info!("error reading request (io): {}", err); ::log::info!("error reading request (io): {}", err);
connections.remove(&poll_token); remove_connection(poll, connections, &poll_token);
break; break;
}, },
@ -306,6 +309,7 @@ pub fn handle_connection_read_event(
/// Read responses from channel, send to peers /// Read responses from channel, send to peers
pub fn send_responses( pub fn send_responses(
config: &Config, config: &Config,
poll: &mut Poll,
buffer: &mut Cursor<&mut [u8]>, buffer: &mut Cursor<&mut [u8]>,
local_responses: Drain<(ConnectionMeta, Response)>, local_responses: Drain<(ConnectionMeta, Response)>,
channel_responses: crossbeam_channel::TryIter<(ConnectionMeta, Response)>, channel_responses: crossbeam_channel::TryIter<(ConnectionMeta, Response)>,
@ -330,7 +334,7 @@ pub fn send_responses(
debug!("sent response"); debug!("sent response");
if !config.network.keep_alive { if !config.network.keep_alive {
connections.remove(&meta.poll_token); remove_connection(poll, connections, &meta.poll_token);
} }
}, },
Err(err) if err.kind() == ErrorKind::WouldBlock => { Err(err) if err.kind() == ErrorKind::WouldBlock => {
@ -339,7 +343,7 @@ pub fn send_responses(
Err(err) => { Err(err) => {
info!("error sending response: {}", err); info!("error sending response: {}", err);
connections.remove(&meta.poll_token); remove_connection(poll, connections, &meta.poll_token);
}, },
} }
} }
@ -349,13 +353,35 @@ pub fn send_responses(
// Close and remove inactive connections // Close and remove inactive connections
pub fn remove_inactive_connections( pub fn remove_inactive_connections(
poll: &mut Poll,
connections: &mut ConnectionMap, connections: &mut ConnectionMap,
){ ){
let now = Instant::now(); let now = Instant::now();
connections.retain(|_, connection| { connections.retain(|_, connection| {
connection.valid_until.0 >= now let keep = connection.valid_until.0 >= now;
if !keep {
if let Err(err) = connection.deregister(poll){
::log::error!("deregister connection error: {}", err);
}
}
keep
}); });
connections.shrink_to_fit(); connections.shrink_to_fit();
} }
fn remove_connection(
poll: &mut Poll,
connections: &mut ConnectionMap,
connection_token: &Token,
){
if let Some(mut connection) = connections.remove(connection_token){
if let Err(err) = connection.deregister(poll){
::log::error!("deregister connection error: {}", err);
}
}
}