diff --git a/aquatic_http/src/lib/glommio/network.rs b/aquatic_http/src/lib/glommio/network.rs index ba04ef8..8bc1ff6 100644 --- a/aquatic_http/src/lib/glommio/network.rs +++ b/aquatic_http/src/lib/glommio/network.rs @@ -2,9 +2,11 @@ use std::io::{BufReader, Cursor, Read}; use std::rc::Rc; use std::sync::Arc; -use aquatic_http_protocol::request::Request; +use aquatic_http_protocol::request::{Request, RequestParseError}; use aquatic_http_protocol::response::Response; use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt}; +use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; +use glommio::channels::shared_channel::{ConnectedSender, SharedSender}; use glommio::prelude::*; use glommio::net::{TcpListener, TcpStream}; use glommio::channels::local_channel::{new_bounded, LocalReceiver, LocalSender}; @@ -20,19 +22,24 @@ struct ConnectionReference { } struct Connection { + request_senders: Rc>, response_receiver: LocalReceiver, tls: ServerConnection, stream: TcpStream, index: usize, + expects_request: bool, } pub async fn run_socket_worker( config: Config, + request_mesh_builder: MeshBuilder, ) { let tls_config = Arc::new(create_tls_config(&config)); let config = Rc::new(config); let listener = TcpListener::bind(config.network.address).expect("bind socket"); + let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap(); + let request_senders = Rc::new(request_senders); let mut connection_slab: Slab = Slab::new(); @@ -46,14 +53,18 @@ pub async fn run_socket_worker( let entry = connection_slab.vacant_entry(); let conn = Connection { + request_senders: request_senders.clone(), response_receiver, tls: ServerConnection::new(tls_config.clone()).unwrap(), stream, index: entry.key(), + expects_request: true, }; async fn handle_stream(mut conn: Connection) { - conn.handle_stream().await; + if let Err(err) = conn.handle_stream().await { + ::log::error!("conn.handle_stream() error: {:?}", err); + } } let handle = spawn_local(handle_stream(conn)).detach(); @@ -74,78 +85,69 @@ pub async fn run_socket_worker( } impl Connection { - async fn handle_stream(&mut self){ + async fn handle_stream(&mut self) -> anyhow::Result<()> { loop { - while let Some(response) = self.response_receiver.stream().next().await { - response.write(&mut self.tls.writer()).unwrap(); + self.write_tls().await?; + self.read_tls().await; - let mut buf = Vec::new(); - let mut buf = Cursor::new(&mut buf); + if !self.tls.is_handshaking() { + if self.expects_request { + let request = self.extract_request()?; - while self.tls.wants_write() { - self.tls.write_tls(&mut buf).unwrap(); - } + self.request_senders.try_send_to(0, request); + self.expects_request = false; - self.stream.write_all(&buf.into_inner()).await.unwrap(); - } - } - } + } else if let Some(response) = self.response_receiver.recv().await { + response.write(&mut self.tls.writer())?; - async fn handle_stream_handshake(&mut self) { - let mut buf = [0u8; 1024]; - - loop { - match self.stream.read(&mut buf).await { - Ok(ciphertext_bytes_read) => { - let mut cursor = Cursor::new(&buf[..ciphertext_bytes_read]); - - match self.tls.read_tls(&mut cursor) { - Ok(plaintext_bytes_read) => { - match self.tls.process_new_packets() { - Ok(_) => { - if ciphertext_bytes_read == 0 && plaintext_bytes_read == 0 { - let mut request_bytes = Vec::new(); - - self.tls.reader().read_to_end(&mut request_bytes); - - match Request::from_bytes(&request_bytes[..]) { - Ok(request) => { - ::log::info!("request read: {:?}", request); - }, - Err(err) => { - // TODO: send error response, close connection - - ::log::info!("Request::from_bytes: {:?}", err); - - break - } - } - } - // TODO: check for io_state.peer_has_closed - }, - Err(err) => { - // TODO: call write_tls - ::log::info!("conn.process_new_packets: {:?}", err); - - break - } - } - }, - Err(err) => { - ::log::info!("conn.read_tls: {:?}", err); - } - } - }, - Err(err) => { - ::log::info!("stream.read: {:?}", err); + self.expects_request = true; } } } } + async fn read_tls(&mut self) -> anyhow::Result<()> { + while self.tls.wants_read() { + let mut buf = Vec::new(); + let _ciphertext_bytes_read = self.stream.read_to_end(&mut buf).await?; + let mut cursor = Cursor::new(&buf[..]); + + let _plaintext_bytes_read = self.tls.read_tls(&mut cursor)?; + + let _io_state = self.tls.process_new_packets()?; + } + + Ok(()) + } + + async fn write_tls(&mut self) -> anyhow::Result<()> { + if !self.tls.wants_write() { + return Ok(()); + } + + let mut buf = Vec::new(); + let mut buf = Cursor::new(&mut buf); + + while self.tls.wants_write() { + self.tls.write_tls(&mut buf)?; + } + + self.stream.write_all(&buf.into_inner()).await?; + + Ok(()) + } + + fn extract_request(&mut self) -> anyhow::Result { + let mut request_bytes = Vec::new(); + + self.tls.reader().read_to_end(&mut request_bytes)?; + + Request::from_bytes(&request_bytes[..]).map_err(|err| anyhow::anyhow!("{:?}", err)) + } } + fn create_tls_config( config: &Config, ) -> rustls::ServerConfig {