diff --git a/Cargo.lock b/Cargo.lock index b604d6e..d4959b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -113,6 +113,7 @@ dependencies = [ "rustls", "rustls-pemfile", "serde", + "slab", "smartstring", "socket2 0.4.2", ] diff --git a/aquatic_http/Cargo.toml b/aquatic_http/Cargo.toml index 73ff3c6..97f7bab 100644 --- a/aquatic_http/Cargo.toml +++ b/aquatic_http/Cargo.toml @@ -17,7 +17,7 @@ path = "src/bin/main.rs" [features] default = ["with-mio"] -with-glommio = ["glommio", "futures-lite", "rustls", "rustls-pemfile"] +with-glommio = ["glommio", "futures-lite", "rustls", "rustls-pemfile", "slab"] with-mio = ["crossbeam-channel", "histogram", "mio", "native-tls", "socket2"] [dependencies] @@ -51,6 +51,7 @@ futures-lite = { version = "1", optional = true } glommio = { git = "https://github.com/DataDog/glommio.git", rev = "4e6b14772da2f4325271fbcf12d24cf91ed466e5", optional = true } rustls = { version = "0.20", optional = true } rustls-pemfile = { version = "0.2", optional = true } +slab = { version = "0.4", optional = true } [dev-dependencies] quickcheck = "1.0" diff --git a/aquatic_http/src/lib/glommio/network.rs b/aquatic_http/src/lib/glommio/network.rs index b1893eb..ba04ef8 100644 --- a/aquatic_http/src/lib/glommio/network.rs +++ b/aquatic_http/src/lib/glommio/network.rs @@ -3,27 +3,67 @@ use std::rc::Rc; use std::sync::Arc; use aquatic_http_protocol::request::Request; -use futures_lite::{AsyncReadExt, StreamExt}; +use aquatic_http_protocol::response::Response; +use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt}; use glommio::prelude::*; use glommio::net::{TcpListener, TcpStream}; +use glommio::channels::local_channel::{new_bounded, LocalReceiver, LocalSender}; +use glommio::task::JoinHandle; use rustls::{IoState, ServerConnection}; +use slab::Slab; use crate::config::Config; +struct ConnectionReference { + response_sender: LocalSender, + handle: JoinHandle<()>, +} + +struct Connection { + response_receiver: LocalReceiver, + tls: ServerConnection, + stream: TcpStream, + index: usize, +} + pub async fn run_socket_worker( config: Config, ) { - let tlsConfig = Arc::new(create_tls_config(&config)); + let tls_config = Arc::new(create_tls_config(&config)); let config = Rc::new(config); let listener = TcpListener::bind(config.network.address).expect("bind socket"); + let mut connection_slab: Slab = Slab::new(); + let mut incoming = listener.incoming(); while let Some(stream) = incoming.next().await { match stream { Ok(stream) => { - spawn_local(handle_stream(config.clone(), tlsConfig.clone(), stream)).detach(); + let (response_sender, response_receiver) = new_bounded(1); + + let entry = connection_slab.vacant_entry(); + + let conn = Connection { + response_receiver, + tls: ServerConnection::new(tls_config.clone()).unwrap(), + stream, + index: entry.key(), + }; + + async fn handle_stream(mut conn: Connection) { + conn.handle_stream().await; + } + + let handle = spawn_local(handle_stream(conn)).detach(); + + let connection_reference = ConnectionReference { + response_sender, + handle, + }; + + entry.insert(connection_reference); }, Err(err) => { ::log::error!("accept connection: {:?}", err); @@ -33,59 +73,79 @@ pub async fn run_socket_worker( } } -async fn handle_stream( - config: Rc, - tlsConfig: Arc, - mut stream: TcpStream, -){ - let mut buf = [0u8; 1024]; - let mut conn = ServerConnection::new(tlsConfig).unwrap(); +impl Connection { + async fn handle_stream(&mut self){ + loop { + while let Some(response) = self.response_receiver.stream().next().await { + response.write(&mut self.tls.writer()).unwrap(); - loop { - match stream.read(&mut buf).await { - Ok(ciphertext_bytes_read) => { - let mut cursor = Cursor::new(&buf[..ciphertext_bytes_read]); + let mut buf = Vec::new(); + let mut buf = Cursor::new(&mut buf); - match conn.read_tls(&mut cursor) { - Ok(plaintext_bytes_read) => { - match conn.process_new_packets() { - Ok(_) => { - if ciphertext_bytes_read == 0 && plaintext_bytes_read == 0 { - let mut request_bytes = Vec::new(); - - conn.reader().read_to_end(&mut request_bytes); - - match Request::from_bytes(&request_bytes[..]) { - Ok(request) => { - - }, - Err(err) => { - // TODO: return error response, close connection - } - } - } - // TODO: check for io_state.peer_has_closed - }, - Err(err) => { - // TODO: call write_tls - ::log::info!("conn.process_new_packets: {:?}", err); - - break - } - } - }, - Err(err) => { - ::log::info!("conn.read_tls: {:?}", err); - } + while self.tls.wants_write() { + self.tls.write_tls(&mut buf).unwrap(); } - }, - Err(err) => { - ::log::info!("stream.read: {:?}", err); + + self.stream.write_all(&buf.into_inner()).await.unwrap(); } } } -} + async fn handle_stream_handshake(&mut self) { + let mut buf = [0u8; 1024]; + + loop { + match self.stream.read(&mut buf).await { + Ok(ciphertext_bytes_read) => { + let mut cursor = Cursor::new(&buf[..ciphertext_bytes_read]); + + match self.tls.read_tls(&mut cursor) { + Ok(plaintext_bytes_read) => { + match self.tls.process_new_packets() { + Ok(_) => { + if ciphertext_bytes_read == 0 && plaintext_bytes_read == 0 { + let mut request_bytes = Vec::new(); + + self.tls.reader().read_to_end(&mut request_bytes); + + match Request::from_bytes(&request_bytes[..]) { + Ok(request) => { + ::log::info!("request read: {:?}", request); + }, + Err(err) => { + // TODO: send error response, close connection + + ::log::info!("Request::from_bytes: {:?}", err); + + break + } + } + } + // TODO: check for io_state.peer_has_closed + }, + Err(err) => { + // TODO: call write_tls + ::log::info!("conn.process_new_packets: {:?}", err); + + break + } + } + }, + Err(err) => { + ::log::info!("conn.read_tls: {:?}", err); + } + } + }, + Err(err) => { + ::log::info!("stream.read: {:?}", err); + } + } + } + } + + + +} fn create_tls_config( config: &Config, ) -> rustls::ServerConfig {