diff --git a/aquatic_http/src/lib/network/connection.rs b/aquatic_http/src/lib/network/connection.rs index aa6e6a0..60c9811 100644 --- a/aquatic_http/src/lib/network/connection.rs +++ b/aquatic_http/src/lib/network/connection.rs @@ -4,7 +4,7 @@ use std::io::{Read, Write}; use std::sync::Arc; use hashbrown::HashMap; -use mio::Token; +use mio::{Token, Poll}; use mio::net::TcpStream; use native_tls::{TlsAcceptor, MidHandshakeTlsStream}; @@ -287,6 +287,31 @@ impl Connection { None } } + + pub fn deregister(&mut self, poll: &mut Poll) -> ::std::io::Result<()> { + match &mut self.inner { + ConnectionInner::Established(established) => { + match &mut established.stream { + Stream::TcpStream(ref mut stream) => { + poll.registry().deregister(stream) + }, + Stream::TlsStream(ref mut stream) => { + poll.registry().deregister(stream.get_mut()) + }, + } + }, + ConnectionInner::InProgress(TlsHandshakeMachine { inner, ..}) => { + match inner { + TlsHandshakeMachineInner::TcpStream(ref mut stream) => { + poll.registry().deregister(stream) + }, + TlsHandshakeMachineInner::TlsMidHandshake(ref mut mid_handshake) => { + poll.registry().deregister(mid_handshake.get_mut()) + }, + } + }, + } + } } diff --git a/aquatic_http/src/lib/network/mod.rs b/aquatic_http/src/lib/network/mod.rs index 7f2ee3c..5aa2085 100644 --- a/aquatic_http/src/lib/network/mod.rs +++ b/aquatic_http/src/lib/network/mod.rs @@ -105,6 +105,7 @@ pub fn run_poll_loop( handle_connection_read_event( &config, socket_worker_index, + &mut poll, &request_channel_sender, &mut local_responses, &mut connections, @@ -116,6 +117,7 @@ pub fn run_poll_loop( if !(local_responses.is_empty() & (response_channel_receiver.is_empty())) { send_responses( &config, + &mut poll, &mut response_buffer, local_responses.drain(..), response_channel_receiver.try_iter(), @@ -125,7 +127,7 @@ pub fn run_poll_loop( // Remove inactive connections, but not every iteration if iter_counter % CONNECTION_CLEAN_INTERVAL == 0 { - remove_inactive_connections(&mut connections); + remove_inactive_connections(&mut poll, &mut connections); } iter_counter = iter_counter.wrapping_add(1); @@ -155,7 +157,7 @@ fn accept_new_streams( let token = *poll_token_counter; // Remove connection if it exists (which is unlikely) - connections.remove(&token); + remove_connection(poll, connections, poll_token_counter); poll.registry() .register(&mut stream, token, Interest::READABLE) @@ -186,6 +188,7 @@ fn accept_new_streams( pub fn handle_connection_read_event( config: &Config, socket_worker_index: usize, + poll: &mut Poll, request_channel_sender: &RequestChannelSender, local_responses: &mut Vec<(ConnectionMeta, Response)>, connections: &mut ConnectionMap, @@ -256,14 +259,14 @@ pub fn handle_connection_read_event( Err(RequestReadError::StreamEnded) => { ::log::debug!("stream ended"); - connections.remove(&poll_token); + remove_connection(poll, connections, &poll_token); break }, Err(RequestReadError::Io(err)) => { ::log::info!("error reading request (io): {}", err); - connections.remove(&poll_token); + remove_connection(poll, connections, &poll_token); break; }, @@ -306,6 +309,7 @@ pub fn handle_connection_read_event( /// Read responses from channel, send to peers pub fn send_responses( config: &Config, + poll: &mut Poll, buffer: &mut Cursor<&mut [u8]>, local_responses: Drain<(ConnectionMeta, Response)>, channel_responses: crossbeam_channel::TryIter<(ConnectionMeta, Response)>, @@ -330,7 +334,7 @@ pub fn send_responses( debug!("sent response"); if !config.network.keep_alive { - connections.remove(&meta.poll_token); + remove_connection(poll, connections, &meta.poll_token); } }, Err(err) if err.kind() == ErrorKind::WouldBlock => { @@ -339,7 +343,7 @@ pub fn send_responses( Err(err) => { info!("error sending response: {}", err); - connections.remove(&meta.poll_token); + remove_connection(poll, connections, &meta.poll_token); }, } } @@ -349,13 +353,35 @@ pub fn send_responses( // Close and remove inactive connections pub fn remove_inactive_connections( + poll: &mut Poll, connections: &mut ConnectionMap, ){ let now = Instant::now(); connections.retain(|_, connection| { - connection.valid_until.0 >= now + let keep = connection.valid_until.0 >= now; + + if !keep { + if let Err(err) = connection.deregister(poll){ + ::log::error!("deregister connection error: {}", err); + } + } + + keep }); connections.shrink_to_fit(); } + + +fn remove_connection( + poll: &mut Poll, + connections: &mut ConnectionMap, + connection_token: &Token, +){ + if let Some(mut connection) = connections.remove(connection_token){ + if let Err(err) = connection.deregister(poll){ + ::log::error!("deregister connection error: {}", err); + } + } +} \ No newline at end of file