aquatic_http: simplify network logic

This commit is contained in:
Joakim Frostegård 2020-07-02 22:14:52 +02:00
parent b43eeb4d65
commit ccfa03f6cc
2 changed files with 36 additions and 57 deletions

View file

@ -109,7 +109,7 @@ impl EstablishedConnection {
} }
enum HandshakeMachine { pub enum HandshakeMachine {
TcpStream(TcpStream), TcpStream(TcpStream),
TlsMidHandshake(MidHandshakeTlsStream<TcpStream>), TlsMidHandshake(MidHandshakeTlsStream<TcpStream>),
} }
@ -122,7 +122,7 @@ impl <'a>HandshakeMachine {
} }
#[inline] #[inline]
fn advance( pub fn advance(
self, self,
opt_tls_acceptor: &Option<TlsAcceptor>, // If set, run TLS opt_tls_acceptor: &Option<TlsAcceptor>, // If set, run TLS
) -> (Option<Either<EstablishedConnection, Self>>, bool) { // bool = stop looping ) -> (Option<Either<EstablishedConnection, Self>>, bool) { // bool = stop looping
@ -171,57 +171,28 @@ impl <'a>HandshakeMachine {
pub struct Connection { pub struct Connection {
pub valid_until: ValidUntil, pub valid_until: ValidUntil,
inner: Either<EstablishedConnection, HandshakeMachine>, pub inner: Either<EstablishedConnection, HandshakeMachine>,
} }
/// Create from TcpStream. Run `advance_handshakes` until `get_established_ws`
/// returns Some(EstablishedWs).
///
/// advance_handshakes takes ownership of self because the TLS handshake
/// methods does. get_established doesn't, since work can be done on a mutable
/// reference to a tls stream, and this way, the whole connection doesn't have
/// to be removed/inserted into the ConnectionMap
impl Connection { impl Connection {
#[inline] #[inline]
pub fn new( pub fn new(
use_tls: bool,
valid_until: ValidUntil, valid_until: ValidUntil,
tcp_stream: TcpStream, tcp_stream: TcpStream,
) -> Self { ) -> Self {
let inner = if use_tls {
Either::Right(HandshakeMachine::new(tcp_stream))
} else {
// If no TLS should be used, just go directly to established
// connection
Either::Left(EstablishedConnection::new(Stream::TcpStream(tcp_stream)))
};
Self { Self {
valid_until, valid_until,
inner: Either::Right(HandshakeMachine::new(tcp_stream)) inner,
}
}
#[inline]
pub fn get_established(&mut self) -> Option<&mut EstablishedConnection> {
match self.inner {
Either::Left(ref mut established) => Some(established),
Either::Right(_) => None,
}
}
#[inline]
pub fn advance_handshakes(
self,
opt_tls_acceptor: &Option<TlsAcceptor>,
valid_until: ValidUntil,
) -> (Option<Self>, bool) {
match self.inner {
Either::Left(_) => (Some(self), false),
Either::Right(machine) => {
let (opt_inner, stop_loop) = machine.advance(
opt_tls_acceptor
);
let opt_new_self = opt_inner.map(|inner| Self {
valid_until,
inner
});
(opt_new_self, stop_loop)
}
} }
} }
} }

View file

@ -24,6 +24,7 @@ fn accept_new_streams(
connections: &mut ConnectionMap, connections: &mut ConnectionMap,
valid_until: ValidUntil, valid_until: ValidUntil,
poll_token_counter: &mut Token, poll_token_counter: &mut Token,
use_tls: bool,
){ ){
loop { loop {
match listener.accept(){ match listener.accept(){
@ -43,7 +44,7 @@ fn accept_new_streams(
.register(&mut stream, token, Interest::READABLE) .register(&mut stream, token, Interest::READABLE)
.unwrap(); .unwrap();
let connection = Connection::new(valid_until, stream); let connection = Connection::new(use_tls, valid_until, stream);
connections.insert(token, connection); connections.insert(token, connection);
}, },
@ -70,15 +71,16 @@ pub fn run_handshake_and_read_requests(
valid_until: ValidUntil, valid_until: ValidUntil,
){ ){
loop { loop {
if let Some(established_connection) = connections.get_mut(&poll_token) let opt_established = connections.get_mut(&poll_token)
.and_then(Connection::get_established) .and_then(|c| c.inner.as_mut().left());
{
match established_connection.read_request(){ if let Some(established) = opt_established {
match established.read_request(){
Ok(request) => { Ok(request) => {
let meta = ConnectionMeta { let meta = ConnectionMeta {
worker_index: socket_worker_index, worker_index: socket_worker_index,
poll_token, poll_token,
peer_addr: established_connection.peer_addr peer_addr: established.peer_addr
}; };
debug!("read request, sending to handler"); debug!("read request, sending to handler");
@ -110,13 +112,19 @@ pub fn run_handshake_and_read_requests(
break; break;
}, },
} }
} else if let Some(connection) = connections.remove(&poll_token){ } else if let Some(handshake_machine) = connections.remove(&poll_token)
let (opt_new_connection, stop_loop) = connection.advance_handshakes( .and_then(|c| c.inner.right())
opt_tls_acceptor, {
valid_until let (opt_inner, stop_loop) = handshake_machine.advance(
opt_tls_acceptor
); );
if let Some(connection) = opt_new_connection { if let Some(inner) = opt_inner {
let connection = Connection {
valid_until,
inner
};
connections.insert(poll_token, connection); connections.insert(poll_token, connection);
} }
@ -134,10 +142,9 @@ pub fn send_responses(
connections: &mut ConnectionMap, connections: &mut ConnectionMap,
){ ){
for (meta, response) in response_channel_receiver { for (meta, response) in response_channel_receiver {
let opt_established = connections.get_mut(&meta.poll_token) if let Some(established) = connections.get_mut(&meta.poll_token)
.and_then(Connection::get_established); .and_then(|c| c.inner.as_mut().left())
{
if let Some(established) = opt_established {
if established.peer_addr != meta.peer_addr { if established.peer_addr != meta.peer_addr {
info!("socket worker error: peer socket addrs didn't match"); info!("socket worker error: peer socket addrs didn't match");
@ -217,6 +224,7 @@ pub fn run_poll_loop(
&mut connections, &mut connections,
valid_until, valid_until,
&mut poll_token_counter, &mut poll_token_counter,
opt_tls_acceptor.is_some(),
); );
} else { } else {
run_handshake_and_read_requests( run_handshake_and_read_requests(