WIP: more work on aquatic_http

This commit is contained in:
Joakim Frostegård 2020-07-02 00:39:50 +02:00
parent 404e528616
commit 76079cf66e
7 changed files with 211 additions and 183 deletions

View file

@ -1,5 +1,6 @@
use std::net::{SocketAddr};
use std::io::{Read, Write};
use std::io::ErrorKind;
use either::Either;
use hashbrown::HashMap;
@ -7,12 +8,9 @@ use log::info;
use mio::Token;
use mio::net::TcpStream;
use native_tls::{TlsAcceptor, TlsStream, MidHandshakeTlsStream};
use tungstenite::WebSocket;
use tungstenite::handshake::{MidHandshake, HandshakeError, server::NoCallback};
use tungstenite::server::{ServerHandshake};
use tungstenite::protocol::WebSocketConfig;
use crate::common::*;
use crate::protocol::{Request, Response};
pub enum Stream {
@ -86,15 +84,100 @@ impl Write for Stream {
}
enum HandshakeMachine {
TcpStream(TcpStream),
TlsStream(TlsStream<TcpStream>),
TlsMidHandshake(MidHandshakeTlsStream<TcpStream>),
WsMidHandshake(MidHandshake<ServerHandshake<Stream, NoCallback>>),
#[derive(Debug)]
pub enum RequestParseError {
NeedMoreData,
Invalid,
Incomplete,
Io(::std::io::Error),
Parse(::httparse::Error)
}
impl HandshakeMachine {
pub struct EstablishedConnection {
stream: Stream,
pub peer_addr: SocketAddr,
buf: Vec<u8>,
}
impl EstablishedConnection {
fn new(stream: Stream) -> Self {
let peer_addr = stream.get_peer_addr();
Self {
stream,
peer_addr,
buf: Vec::new(), // FIXME: with capacity of like 100?
}
}
pub fn parse_request(&mut self) -> Result<Request, RequestParseError> {
match self.stream.read(&mut self.buf){
Ok(0) => {
// FIXME: finished reading completely here?
},
Ok(_) => {
return Err(RequestParseError::NeedMoreData);
},
Err(err) if err.kind() == ErrorKind::WouldBlock => {
return Err(RequestParseError::NeedMoreData);
},
Err(err) => {
return Err(RequestParseError::Io(err));
}
}
let mut headers = [httparse::EMPTY_HEADER; 1];
let mut request = httparse::Request::new(&mut headers);
let request = match request.parse(&self.buf){
Ok(httparse::Status::Complete(_)) => {
if let Some(request) = Request::from_http(request){
Ok(request)
} else {
Err(RequestParseError::Invalid)
}
},
Ok(httparse::Status::Partial) => {
Err(RequestParseError::Incomplete)
},
Err(err) => {
Err(RequestParseError::Parse(err))
}
};
self.buf.clear();
self.buf.shrink_to_fit();
request
}
pub fn send_response(&mut self, body: &str) -> Result<(), RequestParseError> {
let mut response = String::new();
response.push_str("200 OK\r\n\r\n");
response.push_str(body);
match self.stream.write(response.as_bytes()){
Ok(_) => Ok(()),
Err(err) => {
info!("send response: {:?}", err);
Err(RequestParseError::Io(err))
}
}
}
}
enum HandshakeMachine {
TcpStream(TcpStream),
TlsMidHandshake(MidHandshakeTlsStream<TcpStream>),
}
impl <'a>HandshakeMachine {
#[inline]
fn new(tcp_stream: TcpStream) -> Self {
Self::TcpStream(tcp_stream)
@ -103,9 +186,8 @@ impl HandshakeMachine {
#[inline]
fn advance(
self,
ws_config: WebSocketConfig,
opt_tls_acceptor: &Option<TlsAcceptor>, // If set, run TLS
) -> (Option<Either<EstablishedWs, Self>>, bool) { // bool = stop looping
) -> (Option<Either<EstablishedConnection, Self>>, bool) { // bool = stop looping
match self {
HandshakeMachine::TcpStream(stream) => {
if let Some(tls_acceptor) = opt_tls_acceptor {
@ -113,37 +195,22 @@ impl HandshakeMachine {
tls_acceptor.accept(stream)
)
} else {
let handshake_result = ::tungstenite::server::accept_with_config(
Stream::TcpStream(stream),
Some(ws_config)
);
Self::handle_ws_handshake_result(handshake_result)
(Some(Either::Left(EstablishedConnection::new(Stream::TcpStream(stream)))), false)
}
},
HandshakeMachine::TlsStream(stream) => {
let handshake_result = ::tungstenite::server::accept(
Stream::TlsStream(stream),
);
Self::handle_ws_handshake_result(handshake_result)
},
HandshakeMachine::TlsMidHandshake(handshake) => {
Self::handle_tls_handshake_result(handshake.handshake())
},
HandshakeMachine::WsMidHandshake(handshake) => {
Self::handle_ws_handshake_result(handshake.handshake())
},
}
}
#[inline]
fn handle_tls_handshake_result(
result: Result<TlsStream<TcpStream>, ::native_tls::HandshakeError<TcpStream>>,
) -> (Option<Either<EstablishedWs, Self>>, bool) {
) -> (Option<Either<EstablishedConnection, Self>>, bool) {
match result {
Ok(stream) => {
(Some(Either::Right(Self::TlsStream(stream))), false)
(Some(Either::Left(EstablishedConnection::new(Stream::TlsStream(stream)))), false)
},
Err(native_tls::HandshakeError::WouldBlock(handshake)) => {
(Some(Either::Right(Self::TlsMidHandshake(handshake))), true)
@ -155,76 +222,38 @@ impl HandshakeMachine {
}
}
}
#[inline]
fn handle_ws_handshake_result(
result: Result<WebSocket<Stream>, HandshakeError<ServerHandshake<Stream, NoCallback>>> ,
) -> (Option<Either<EstablishedWs, Self>>, bool) {
match result {
Ok(mut ws) => {
let peer_addr = ws.get_mut().get_peer_addr();
let established_ws = EstablishedWs {
ws,
peer_addr,
};
(Some(Either::Left(established_ws)), false)
},
Err(HandshakeError::Interrupted(handshake)) => {
(Some(Either::Right(HandshakeMachine::WsMidHandshake(handshake))), true)
},
Err(HandshakeError::Failure(err)) => {
info!("ws handshake error: {}", err);
(None, false)
}
}
}
}
pub struct EstablishedWs {
pub ws: WebSocket<Stream>,
pub peer_addr: SocketAddr,
}
pub struct Connection {
ws_config: WebSocketConfig,
pub valid_until: ValidUntil,
inner: Either<EstablishedWs, HandshakeMachine>,
inner: Either<EstablishedConnection, HandshakeMachine>,
}
/// Create from TcpStream. Run `advance_handshakes` until `get_established_ws`
/// returns Some(EstablishedWs).
///
/// advance_handshakes takes ownership of self because the TLS and WebSocket
/// handshake methods do. get_established_ws doesn't, since work can be done
/// on a mutable reference to a tungstenite websocket, and this way, the whole
/// Connection doesn't have to be removed from and reinserted into the
/// TorrentMap. This is also the reason for wrapping Container.inner in an
/// Either instead of combining all states into one structure just having a
/// single method for advancing handshakes and maybe returning a websocket.
/// advance_handshakes takes ownership of self because the TLS handshake
/// methods does. get_established doesn't, since work can be done on a mutable
/// reference to a tls stream, and this way, the whole connection doesn't have
/// to be removed/inserted into the ConnectionMap
impl Connection {
#[inline]
pub fn new(
ws_config: WebSocketConfig,
valid_until: ValidUntil,
tcp_stream: TcpStream,
) -> Self {
Self {
ws_config,
valid_until,
inner: Either::Right(HandshakeMachine::new(tcp_stream))
}
}
#[inline]
pub fn get_established_ws<'a>(&mut self) -> Option<&mut EstablishedWs> {
pub fn get_established(&mut self) -> Option<&mut EstablishedConnection> {
match self.inner {
Either::Left(ref mut ews) => Some(ews),
Either::Left(ref mut established) => Some(established),
Either::Right(_) => None,
}
}
@ -238,15 +267,11 @@ impl Connection {
match self.inner {
Either::Left(_) => (Some(self), false),
Either::Right(machine) => {
let ws_config = self.ws_config;
let (opt_inner, stop_loop) = machine.advance(
ws_config,
opt_tls_acceptor
);
let opt_new_self = opt_inner.map(|inner| Self {
ws_config,
valid_until,
inner
});
@ -255,24 +280,7 @@ impl Connection {
}
}
}
#[inline]
pub fn close(&mut self){
if let Either::Left(ref mut ews) = self.inner {
if ews.ws.can_read(){
ews.ws.close(None).unwrap();
// Required after ws.close()
if let Err(err) = ews.ws.write_pending(){
info!(
"error writing pending messages after closing ws: {}",
err
)
}
}
}
}
}
pub type ConnectionMap = HashMap<Token, Connection>;
pub type ConnectionMap<'a> = HashMap<Token, Connection>;