From 96593c97fcbe87f64151bf4883234e982b75882d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 26 Oct 2021 21:12:34 +0200 Subject: [PATCH] WIP: aquatic_http glommio work --- aquatic_http/src/lib/glommio/network.rs | 112 +++++++++++++++++------- 1 file changed, 81 insertions(+), 31 deletions(-) diff --git a/aquatic_http/src/lib/glommio/network.rs b/aquatic_http/src/lib/glommio/network.rs index 904b32e..2737234 100644 --- a/aquatic_http/src/lib/glommio/network.rs +++ b/aquatic_http/src/lib/glommio/network.rs @@ -1,5 +1,5 @@ use std::cell::RefCell; -use std::io::{BufReader, Cursor, Read}; +use std::io::{BufReader, Cursor, ErrorKind, Read}; use std::rc::Rc; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -27,12 +27,13 @@ struct ConnectionReference { } struct Connection { - request_senders: Rc>, - response_receiver: LocalReceiver, + // request_senders: Rc>, + // response_receiver: LocalReceiver, tls: ServerConnection, stream: TcpStream, index: ConnectionId, expects_request: bool, + request_buffer: Vec, } pub async fn run_socket_worker( @@ -47,16 +48,16 @@ pub async fn run_socket_worker( let listener = TcpListener::bind(config.network.address).expect("bind socket"); num_bound_sockets.fetch_add(1, Ordering::SeqCst); - let (_, mut response_receivers) = response_mesh_builder.join(Role::Consumer).await.unwrap(); + // let (_, mut response_receivers) = response_mesh_builder.join(Role::Consumer).await.unwrap(); - let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap(); - let request_senders = Rc::new(request_senders); + // let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap(); + // let request_senders = Rc::new(request_senders); let connection_slab = Rc::new(RefCell::new(Slab::new())); - for (_, response_receiver) in response_receivers.streams() { - spawn_local(receive_responses(response_receiver, connection_slab.clone())).detach(); - } + // for (_, response_receiver) in response_receivers.streams() { + // spawn_local(receive_responses(response_receiver, connection_slab.clone())).detach(); + // } let mut incoming = listener.incoming(); @@ -69,12 +70,13 @@ pub async fn run_socket_worker( let entry = slab.vacant_entry(); let conn = Connection { - request_senders: request_senders.clone(), - response_receiver, + // request_senders: request_senders.clone(), + // response_receiver, tls: ServerConnection::new(tls_config.clone()).unwrap(), stream, index: ConnectionId(entry.key()), expects_request: true, + request_buffer: Vec::new(), }; async fn handle_stream(mut conn: Connection) { @@ -113,37 +115,91 @@ async fn receive_responses( impl Connection { async fn handle_stream(&mut self) -> anyhow::Result<()> { + ::log::info!("incoming stream"); loop { self.write_tls().await?; - self.read_tls().await; + self.read_tls().await?; + /* if !self.tls.is_handshaking() { if self.expects_request { let request = self.extract_request()?; - self.request_senders.try_send_to(0, (self.index, request)); + ::log::info!("request received: {:?}", request); + + // self.request_senders.try_send_to(0, (self.index, request)); self.expects_request = false; - } else if let Some(response) = self.response_receiver.recv().await { + }/* + else if let Some(response) = self.response_receiver.recv().await { response.write(&mut self.tls.writer())?; self.expects_request = true; - } + } */ } + */ } } async fn read_tls(&mut self) -> anyhow::Result<()> { - while self.tls.wants_read() { - let mut buf = Vec::new(); + loop { + ::log::info!("read_tls (wants read)"); - let _ciphertext_bytes_read = self.stream.read_to_end(&mut buf).await?; + let mut buf = [0u8; 1024]; - let mut cursor = Cursor::new(&buf[..]); + let bytes_read = self.stream.read(&mut buf).await?; - let _plaintext_bytes_read = self.tls.read_tls(&mut cursor)?; + if bytes_read == 0 { + // Peer has closed connection. Remove it. + return Err(anyhow::anyhow!("peer has closed connection")); + } - let _io_state = self.tls.process_new_packets()?; + let _ = self.tls.read_tls(&mut &buf[..bytes_read]).unwrap(); + + let io_state = self.tls.process_new_packets()?; + + let mut added_plaintext = false; + + while io_state.plaintext_bytes_to_read() != 0 { + match self.tls.reader().read(&mut buf) { + Ok(0) => { + break; + } + Ok(amt) => { + self.request_buffer.extend_from_slice(&buf[..amt]); + + added_plaintext = true; + }, + Err(err) if err.kind() == ErrorKind::WouldBlock => { + break; + } + Err(err) => { + ::log::info!("tls.reader().read error: {:?}", err); + + break; + } + } + } + + if added_plaintext { + match Request::from_bytes(&self.request_buffer[..]) { + Ok(request) => { + self.expects_request = false; + + ::log::info!("received request: {:?}", request); + } + Err(RequestParseError::NeedMoreData) => { + ::log::info!("need more request data. current data: {:?}", std::str::from_utf8(&self.request_buffer)); + } + Err(RequestParseError::Invalid(err)) => { + return Err(anyhow::anyhow!("request parse error: {:?}", err)); + } + } + } + + if self.tls.wants_write() { + break + } } Ok(()) @@ -154,23 +210,17 @@ impl Connection { return Ok(()); } + ::log::info!("write_tls (wants write)"); + let mut buf = Vec::new(); let mut buf = Cursor::new(&mut buf); while self.tls.wants_write() { - self.tls.write_tls(&mut buf)?; + self.tls.write_tls(&mut buf).unwrap(); } - self.stream.write_all(&buf.into_inner()).await?; + self.stream.write_all(&buf.into_inner()).await.unwrap(); Ok(()) } - - fn extract_request(&mut self) -> anyhow::Result { - let mut request_bytes = Vec::new(); - - self.tls.reader().read_to_end(&mut request_bytes)?; - - Request::from_bytes(&request_bytes[..]).map_err(|err| anyhow::anyhow!("{:?}", err)) - } }