aquatic_http: keep connection after sending response; other fixes

This commit is contained in:
Joakim Frostegård 2020-07-02 21:14:56 +02:00
parent b86787ef20
commit f73910934b
3 changed files with 146 additions and 189 deletions

View file

@ -12,15 +12,16 @@ use native_tls::{TlsAcceptor, TlsStream, MidHandshakeTlsStream};
use aquatic_common_tcp::network::stream::Stream;
use crate::common::*;
use crate::protocol::{Request, Response};
use crate::protocol::Request;
#[derive(Debug)]
pub enum RequestParseError {
pub enum RequestReadError {
NeedMoreData,
Invalid,
StreamEnded,
Io(::std::io::Error),
Parse(::httparse::Error)
Parse(::httparse::Error),
}
@ -44,25 +45,24 @@ impl EstablishedConnection {
}
}
pub fn parse_request(&mut self) -> Result<Request, RequestParseError> {
pub fn read_request(&mut self) -> Result<Request, RequestReadError> {
match self.stream.read(&mut self.buf[self.bytes_read..]){
Ok(0) => {
return Err(RequestReadError::StreamEnded);
}
Ok(bytes_read) => {
self.bytes_read += bytes_read;
info!("parse request read {} bytes", bytes_read);
},
Err(err) if err.kind() == ErrorKind::WouldBlock => {
return Err(RequestParseError::NeedMoreData);
return Err(RequestReadError::NeedMoreData);
},
Err(err) => {
return Err(RequestParseError::Io(err));
return Err(RequestReadError::Io(err));
}
}
if self.bytes_read == 0 {
return Err(RequestParseError::NeedMoreData); // FIXME: ???
}
let mut headers = [httparse::EMPTY_HEADER; 16];
let mut request = httparse::Request::new(&mut headers);
@ -71,7 +71,7 @@ impl EstablishedConnection {
let result = if let Some(request) = Request::from_http(request){
Ok(request)
} else {
Err(RequestParseError::Invalid)
Err(RequestReadError::Invalid)
};
self.bytes_read = 0;
@ -79,12 +79,12 @@ impl EstablishedConnection {
result
},
Ok(httparse::Status::Partial) => {
Err(RequestParseError::NeedMoreData)
Err(RequestReadError::NeedMoreData)
},
Err(err) => {
self.bytes_read = 0;
Err(RequestParseError::Parse(err))
Err(RequestReadError::Parse(err))
}
};

View file

@ -1,7 +1,6 @@
pub mod connection;
pub mod utils;
use std::time::Duration;
use std::time::{Duration, Instant};
use std::io::ErrorKind;
use hashbrown::HashMap;
@ -17,8 +16,6 @@ use crate::config::Config;
use crate::protocol::*;
use connection::*;
use utils::*;
fn accept_new_streams(
@ -39,7 +36,8 @@ fn accept_new_streams(
let token = *poll_token_counter;
remove_connection_if_exists(connections, token);
// Remove connection if it exists (which is unlikely)
connections.remove(&token);
poll.registry()
.register(&mut stream, token, Interest::READABLE)
@ -60,37 +58,122 @@ fn accept_new_streams(
}
}
// will be almost identical to ws version
pub fn run_socket_worker(
config: Config,
socket_worker_index: usize,
socket_worker_statuses: SocketWorkerStatuses,
request_channel_sender: RequestChannelSender,
response_channel_receiver: ResponseChannelReceiver,
opt_tls_acceptor: Option<TlsAcceptor>,
){
match create_listener(config.network.address, config.network.ipv6_only){
Ok(listener) => {
socket_worker_statuses.lock()[socket_worker_index] = Some(Ok(()));
run_poll_loop(
config,
socket_worker_index,
request_channel_sender,
response_channel_receiver,
listener,
opt_tls_acceptor
);
},
Err(err) => {
socket_worker_statuses.lock()[socket_worker_index] = Some(
Err(format!("Couldn't open socket: {:#}", err))
/// On the stream given by poll_token, get TLS (if requested) and tungstenite
/// up and running, then read messages and pass on through channel.
pub fn run_handshake_and_read_requests(
socket_worker_index: usize,
request_channel_sender: &RequestChannelSender,
opt_tls_acceptor: &Option<TlsAcceptor>, // If set, run TLS
connections: &mut ConnectionMap,
poll_token: Token,
valid_until: ValidUntil,
){
loop {
if let Some(established_connection) = connections.get_mut(&poll_token)
.and_then(Connection::get_established)
{
match established_connection.read_request(){
Ok(request) => {
let meta = ConnectionMeta {
worker_index: socket_worker_index,
poll_token,
peer_addr: established_connection.peer_addr
};
debug!("read request, sending to handler");
if let Err(err) = request_channel_sender
.send((meta, request))
{
error!(
"RequestChannelSender: couldn't send message: {:?}",
err
);
}
break
},
Err(RequestReadError::NeedMoreData) => {
info!("need more data");
break;
},
Err(err) => {
info!("error reading request: {:?}", err);
connections.remove(&poll_token);
break;
},
}
} else if let Some(connection) = connections.remove(&poll_token){
let (opt_new_connection, stop_loop) = connection.advance_handshakes(
opt_tls_acceptor,
valid_until
);
if let Some(connection) = opt_new_connection {
connections.insert(poll_token, connection);
}
if stop_loop {
break;
}
}
}
}
/// Read messages from channel, send to peers
pub fn send_responses(
response_channel_receiver: ::flume::Drain<(ConnectionMeta, Response)>,
connections: &mut ConnectionMap,
){
for (meta, response) in response_channel_receiver {
let opt_established = connections.get_mut(&meta.poll_token)
.and_then(Connection::get_established);
if let Some(established) = opt_established {
if established.peer_addr != meta.peer_addr {
info!("socket worker error: peer socket addrs didn't match");
continue;
}
match established.send_response(&response.to_bytes()){
Ok(()) => {
debug!("sent response");
},
Err(err) if err.kind() == ErrorKind::WouldBlock => {
debug!("send response: would block");
},
Err(err) => {
info!("error sending response: {}", err);
connections.remove(&meta.poll_token);
},
}
}
}
}
// Close and remove inactive connections
pub fn remove_inactive_connections(
connections: &mut ConnectionMap,
){
let now = Instant::now();
connections.retain(|_, connection| {
connection.valid_until.0 >= now
});
connections.shrink_to_fit();
}
// will be almost identical to ws version
pub fn run_poll_loop(
config: Config,
@ -161,117 +244,31 @@ pub fn run_poll_loop(
}
/// On the stream given by poll_token, get TLS (if requested) and tungstenite
/// up and running, then read messages and pass on through channel.
pub fn run_handshake_and_read_requests(
pub fn run_socket_worker(
config: Config,
socket_worker_index: usize,
request_channel_sender: &RequestChannelSender,
opt_tls_acceptor: &Option<TlsAcceptor>, // If set, run TLS
connections: &mut ConnectionMap,
poll_token: Token,
valid_until: ValidUntil,
socket_worker_statuses: SocketWorkerStatuses,
request_channel_sender: RequestChannelSender,
response_channel_receiver: ResponseChannelReceiver,
opt_tls_acceptor: Option<TlsAcceptor>,
){
loop {
if let Some(established_connection) = connections.get_mut(&poll_token)
.and_then(Connection::get_established)
{
match established_connection.parse_request(){
Ok(request) => {
let meta = ConnectionMeta {
worker_index: socket_worker_index,
poll_token,
peer_addr: established_connection.peer_addr
};
match create_listener(config.network.address, config.network.ipv6_only){
Ok(listener) => {
socket_worker_statuses.lock()[socket_worker_index] = Some(Ok(()));
debug!("read request, sending to handler");
if let Err(err) = request_channel_sender
.send((meta, request))
{
error!(
"RequestChannelSender: couldn't send message: {:?}",
err
);
}
break
},
Err(RequestParseError::NeedMoreData) => {
info!("need more data");
break;
},
Err(RequestParseError::Io(err)) => {
info!("error reading request: {}", err);
remove_connection_if_exists(connections, poll_token);
break;
},
Err(e) => {
info!("error reading request: {:?}", e);
remove_connection_if_exists(connections, poll_token);
break;
},
}
} else if let Some(connection) = connections.remove(&poll_token){
let (opt_new_connection, stop_loop) = connection.advance_handshakes(
opt_tls_acceptor,
valid_until
run_poll_loop(
config,
socket_worker_index,
request_channel_sender,
response_channel_receiver,
listener,
opt_tls_acceptor
);
},
Err(err) => {
socket_worker_statuses.lock()[socket_worker_index] = Some(
Err(format!("Couldn't open socket: {:#}", err))
);
if let Some(connection) = opt_new_connection {
connections.insert(poll_token, connection);
}
if stop_loop {
break;
}
}
}
}
/// Read messages from channel, send to peers
pub fn send_responses(
response_channel_receiver: ::flume::Drain<(ConnectionMeta, Response)>,
connections: &mut ConnectionMap,
){
for (meta, response) in response_channel_receiver {
let opt_established = connections.get_mut(&meta.poll_token)
.and_then(Connection::get_established);
if let Some(established) = opt_established {
if established.peer_addr != meta.peer_addr {
info!("socket worker error: peer socket addrs didn't match");
continue;
}
match established.send_response(&response.to_bytes()){
Ok(()) => {
debug!("sent response");
remove_connection_if_exists(
connections,
meta.poll_token
);
},
Err(err) if err.kind() == ErrorKind::WouldBlock => {
debug!("send response: would block");
},
Err(err) => {
info!("error sending response: {}", err);
remove_connection_if_exists(
connections,
meta.poll_token
);
},
}
}
}
}

View file

@ -1,40 +0,0 @@
use std::time::Instant;
use anyhow::Context;
use mio::Token;
use super::*;
/// Don't bother with deregistering from Poll. In my understanding, this is
/// done automatically when the stream is dropped, as long as there are no
/// other references to the file descriptor, such as when it is accessed
/// in multiple threads.
pub fn remove_connection_if_exists(
connections: &mut ConnectionMap,
token: Token,
){
if let Some(mut connection) = connections.remove(&token){
// connection.close(); // FIXME
}
}
// Close and remove inactive connections
pub fn remove_inactive_connections(
connections: &mut ConnectionMap,
){
let now = Instant::now();
connections.retain(|_, connection| {
if connection.valid_until.0 < now {
// connection.close(); // FIXME
false
} else {
true
}
});
connections.shrink_to_fit();
}