diff --git a/aquatic_http/src/lib/network/connection.rs b/aquatic_http/src/lib/network/connection.rs index 03def2c..c5ba497 100644 --- a/aquatic_http/src/lib/network/connection.rs +++ b/aquatic_http/src/lib/network/connection.rs @@ -12,15 +12,16 @@ use native_tls::{TlsAcceptor, TlsStream, MidHandshakeTlsStream}; use aquatic_common_tcp::network::stream::Stream; use crate::common::*; -use crate::protocol::{Request, Response}; +use crate::protocol::Request; #[derive(Debug)] -pub enum RequestParseError { +pub enum RequestReadError { NeedMoreData, Invalid, + StreamEnded, Io(::std::io::Error), - Parse(::httparse::Error) + Parse(::httparse::Error), } @@ -44,25 +45,24 @@ impl EstablishedConnection { } } - pub fn parse_request(&mut self) -> Result { + pub fn read_request(&mut self) -> Result { match self.stream.read(&mut self.buf[self.bytes_read..]){ + Ok(0) => { + return Err(RequestReadError::StreamEnded); + } Ok(bytes_read) => { self.bytes_read += bytes_read; info!("parse request read {} bytes", bytes_read); }, Err(err) if err.kind() == ErrorKind::WouldBlock => { - return Err(RequestParseError::NeedMoreData); + return Err(RequestReadError::NeedMoreData); }, Err(err) => { - return Err(RequestParseError::Io(err)); + return Err(RequestReadError::Io(err)); } } - if self.bytes_read == 0 { - return Err(RequestParseError::NeedMoreData); // FIXME: ??? - } - let mut headers = [httparse::EMPTY_HEADER; 16]; let mut request = httparse::Request::new(&mut headers); @@ -71,7 +71,7 @@ impl EstablishedConnection { let result = if let Some(request) = Request::from_http(request){ Ok(request) } else { - Err(RequestParseError::Invalid) + Err(RequestReadError::Invalid) }; self.bytes_read = 0; @@ -79,12 +79,12 @@ impl EstablishedConnection { result }, Ok(httparse::Status::Partial) => { - Err(RequestParseError::NeedMoreData) + Err(RequestReadError::NeedMoreData) }, Err(err) => { self.bytes_read = 0; - Err(RequestParseError::Parse(err)) + Err(RequestReadError::Parse(err)) } }; diff --git a/aquatic_http/src/lib/network/mod.rs b/aquatic_http/src/lib/network/mod.rs index 2630b69..279bc86 100644 --- a/aquatic_http/src/lib/network/mod.rs +++ b/aquatic_http/src/lib/network/mod.rs @@ -1,7 +1,6 @@ pub mod connection; -pub mod utils; -use std::time::Duration; +use std::time::{Duration, Instant}; use std::io::ErrorKind; use hashbrown::HashMap; @@ -17,8 +16,6 @@ use crate::config::Config; use crate::protocol::*; use connection::*; -use utils::*; - fn accept_new_streams( @@ -39,7 +36,8 @@ fn accept_new_streams( let token = *poll_token_counter; - remove_connection_if_exists(connections, token); + // Remove connection if it exists (which is unlikely) + connections.remove(&token); poll.registry() .register(&mut stream, token, Interest::READABLE) @@ -60,37 +58,122 @@ fn accept_new_streams( } } -// will be almost identical to ws version -pub fn run_socket_worker( - config: Config, - socket_worker_index: usize, - socket_worker_statuses: SocketWorkerStatuses, - request_channel_sender: RequestChannelSender, - response_channel_receiver: ResponseChannelReceiver, - opt_tls_acceptor: Option, -){ - match create_listener(config.network.address, config.network.ipv6_only){ - Ok(listener) => { - socket_worker_statuses.lock()[socket_worker_index] = Some(Ok(())); - run_poll_loop( - config, - socket_worker_index, - request_channel_sender, - response_channel_receiver, - listener, - opt_tls_acceptor - ); - }, - Err(err) => { - socket_worker_statuses.lock()[socket_worker_index] = Some( - Err(format!("Couldn't open socket: {:#}", err)) + +/// On the stream given by poll_token, get TLS (if requested) and tungstenite +/// up and running, then read messages and pass on through channel. +pub fn run_handshake_and_read_requests( + socket_worker_index: usize, + request_channel_sender: &RequestChannelSender, + opt_tls_acceptor: &Option, // If set, run TLS + connections: &mut ConnectionMap, + poll_token: Token, + valid_until: ValidUntil, +){ + loop { + if let Some(established_connection) = connections.get_mut(&poll_token) + .and_then(Connection::get_established) + { + match established_connection.read_request(){ + Ok(request) => { + let meta = ConnectionMeta { + worker_index: socket_worker_index, + poll_token, + peer_addr: established_connection.peer_addr + }; + + debug!("read request, sending to handler"); + + if let Err(err) = request_channel_sender + .send((meta, request)) + { + error!( + "RequestChannelSender: couldn't send message: {:?}", + err + ); + } + + break + }, + Err(RequestReadError::NeedMoreData) => { + info!("need more data"); + + break; + }, + Err(err) => { + info!("error reading request: {:?}", err); + + connections.remove(&poll_token); + + break; + }, + } + } else if let Some(connection) = connections.remove(&poll_token){ + let (opt_new_connection, stop_loop) = connection.advance_handshakes( + opt_tls_acceptor, + valid_until ); + + if let Some(connection) = opt_new_connection { + connections.insert(poll_token, connection); + } + + if stop_loop { + break; + } } } } +/// Read messages from channel, send to peers +pub fn send_responses( + response_channel_receiver: ::flume::Drain<(ConnectionMeta, Response)>, + connections: &mut ConnectionMap, +){ + for (meta, response) in response_channel_receiver { + let opt_established = connections.get_mut(&meta.poll_token) + .and_then(Connection::get_established); + + if let Some(established) = opt_established { + if established.peer_addr != meta.peer_addr { + info!("socket worker error: peer socket addrs didn't match"); + + continue; + } + + match established.send_response(&response.to_bytes()){ + Ok(()) => { + debug!("sent response"); + }, + Err(err) if err.kind() == ErrorKind::WouldBlock => { + debug!("send response: would block"); + }, + Err(err) => { + info!("error sending response: {}", err); + + connections.remove(&meta.poll_token); + }, + } + } + } +} + + +// Close and remove inactive connections +pub fn remove_inactive_connections( + connections: &mut ConnectionMap, +){ + let now = Instant::now(); + + connections.retain(|_, connection| { + connection.valid_until.0 >= now + }); + + connections.shrink_to_fit(); +} + + // will be almost identical to ws version pub fn run_poll_loop( config: Config, @@ -161,117 +244,31 @@ pub fn run_poll_loop( } - -/// On the stream given by poll_token, get TLS (if requested) and tungstenite -/// up and running, then read messages and pass on through channel. -pub fn run_handshake_and_read_requests( +pub fn run_socket_worker( + config: Config, socket_worker_index: usize, - request_channel_sender: &RequestChannelSender, - opt_tls_acceptor: &Option, // If set, run TLS - connections: &mut ConnectionMap, - poll_token: Token, - valid_until: ValidUntil, + socket_worker_statuses: SocketWorkerStatuses, + request_channel_sender: RequestChannelSender, + response_channel_receiver: ResponseChannelReceiver, + opt_tls_acceptor: Option, ){ - loop { - if let Some(established_connection) = connections.get_mut(&poll_token) - .and_then(Connection::get_established) - { - match established_connection.parse_request(){ - Ok(request) => { - let meta = ConnectionMeta { - worker_index: socket_worker_index, - poll_token, - peer_addr: established_connection.peer_addr - }; + match create_listener(config.network.address, config.network.ipv6_only){ + Ok(listener) => { + socket_worker_statuses.lock()[socket_worker_index] = Some(Ok(())); - debug!("read request, sending to handler"); - - if let Err(err) = request_channel_sender - .send((meta, request)) - { - error!( - "RequestChannelSender: couldn't send message: {:?}", - err - ); - } - - break - }, - Err(RequestParseError::NeedMoreData) => { - info!("need more data"); - - break; - }, - Err(RequestParseError::Io(err)) => { - info!("error reading request: {}", err); - - remove_connection_if_exists(connections, poll_token); - - break; - }, - Err(e) => { - info!("error reading request: {:?}", e); - - remove_connection_if_exists(connections, poll_token); - - break; - }, - } - } else if let Some(connection) = connections.remove(&poll_token){ - let (opt_new_connection, stop_loop) = connection.advance_handshakes( - opt_tls_acceptor, - valid_until + run_poll_loop( + config, + socket_worker_index, + request_channel_sender, + response_channel_receiver, + listener, + opt_tls_acceptor + ); + }, + Err(err) => { + socket_worker_statuses.lock()[socket_worker_index] = Some( + Err(format!("Couldn't open socket: {:#}", err)) ); - - if let Some(connection) = opt_new_connection { - connections.insert(poll_token, connection); - } - - if stop_loop { - break; - } - } - } -} - - -/// Read messages from channel, send to peers -pub fn send_responses( - response_channel_receiver: ::flume::Drain<(ConnectionMeta, Response)>, - connections: &mut ConnectionMap, -){ - for (meta, response) in response_channel_receiver { - let opt_established = connections.get_mut(&meta.poll_token) - .and_then(Connection::get_established); - - if let Some(established) = opt_established { - if established.peer_addr != meta.peer_addr { - info!("socket worker error: peer socket addrs didn't match"); - - continue; - } - - match established.send_response(&response.to_bytes()){ - Ok(()) => { - debug!("sent response"); - - remove_connection_if_exists( - connections, - meta.poll_token - ); - }, - Err(err) if err.kind() == ErrorKind::WouldBlock => { - debug!("send response: would block"); - }, - Err(err) => { - info!("error sending response: {}", err); - - remove_connection_if_exists( - connections, - meta.poll_token - ); - }, - } } } } \ No newline at end of file diff --git a/aquatic_http/src/lib/network/utils.rs b/aquatic_http/src/lib/network/utils.rs deleted file mode 100644 index 2e07c22..0000000 --- a/aquatic_http/src/lib/network/utils.rs +++ /dev/null @@ -1,40 +0,0 @@ -use std::time::Instant; - -use anyhow::Context; -use mio::Token; - -use super::*; - - -/// Don't bother with deregistering from Poll. In my understanding, this is -/// done automatically when the stream is dropped, as long as there are no -/// other references to the file descriptor, such as when it is accessed -/// in multiple threads. -pub fn remove_connection_if_exists( - connections: &mut ConnectionMap, - token: Token, -){ - if let Some(mut connection) = connections.remove(&token){ - // connection.close(); // FIXME - } -} - - -// Close and remove inactive connections -pub fn remove_inactive_connections( - connections: &mut ConnectionMap, -){ - let now = Instant::now(); - - connections.retain(|_, connection| { - if connection.valid_until.0 < now { - // connection.close(); // FIXME - - false - } else { - true - } - }); - - connections.shrink_to_fit(); -} \ No newline at end of file