From 34bc4046b76a9e2c513ec875a5e20c1d4201ba50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 26 Oct 2021 16:26:37 +0200 Subject: [PATCH] WIP: aquatic_http glommio impl --- Cargo.lock | 70 ++++++++++++++ aquatic_http/Cargo.toml | 6 +- aquatic_http/src/lib/glommio/mod.rs | 2 + aquatic_http/src/lib/glommio/network.rs | 119 ++++++++++++++++++++++++ 4 files changed, 195 insertions(+), 2 deletions(-) create mode 100644 aquatic_http/src/lib/glommio/network.rs diff --git a/Cargo.lock b/Cargo.lock index 607e122..b604d6e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -110,6 +110,8 @@ dependencies = [ "quickcheck", "quickcheck_macros", "rand", + "rustls", + "rustls-pemfile", "serde", "smartstring", "socket2 0.4.2", @@ -1545,6 +1547,21 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "ring" +version = "0.16.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" +dependencies = [ + "cc", + "libc", + "once_cell", + "spin", + "untrusted", + "web-sys", + "winapi 0.3.9", +] + [[package]] name = "rlimit" version = "0.6.2" @@ -1569,6 +1586,27 @@ dependencies = [ "semver", ] +[[package]] +name = "rustls" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b5ac6078ca424dc1d3ae2328526a76787fecc7f8011f520e3276730e711fc95" +dependencies = [ + "log", + "ring", + "sct", + "webpki", +] + +[[package]] +name = "rustls-pemfile" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5eebeaeb360c87bfb72e84abdb3447159c0eaececf1bef2aecd65a8be949d1c9" +dependencies = [ + "base64", +] + [[package]] name = "ryu" version = "1.0.5" @@ -1606,6 +1644,16 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" +[[package]] +name = "sct" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "security-framework" version = "2.4.2" @@ -1780,6 +1828,12 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "spin" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" + [[package]] name = "static_assertions" version = "1.1.0" @@ -1999,6 +2053,12 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3" +[[package]] +name = "untrusted" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" + [[package]] name = "url" version = "2.2.2" @@ -2134,6 +2194,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "winapi" version = "0.2.8" diff --git a/aquatic_http/Cargo.toml b/aquatic_http/Cargo.toml index 092bd5d..73ff3c6 100644 --- a/aquatic_http/Cargo.toml +++ b/aquatic_http/Cargo.toml @@ -17,7 +17,7 @@ path = "src/bin/main.rs" [features] default = ["with-mio"] -with-glommio = ["glommio", "futures-lite"] +with-glommio = ["glommio", "futures-lite", "rustls", "rustls-pemfile"] with-mio = ["crossbeam-channel", "histogram", "mio", "native-tls", "socket2"] [dependencies] @@ -47,8 +47,10 @@ native-tls = { version = "0.2", optional = true } socket2 = { version = "0.4.1", features = ["all"], optional = true } # glommio -glommio = { git = "https://github.com/DataDog/glommio.git", rev = "4e6b14772da2f4325271fbcf12d24cf91ed466e5", optional = true } futures-lite = { version = "1", optional = true } +glommio = { git = "https://github.com/DataDog/glommio.git", rev = "4e6b14772da2f4325271fbcf12d24cf91ed466e5", optional = true } +rustls = { version = "0.20", optional = true } +rustls-pemfile = { version = "0.2", optional = true } [dev-dependencies] quickcheck = "1.0" diff --git a/aquatic_http/src/lib/glommio/mod.rs b/aquatic_http/src/lib/glommio/mod.rs index 0f2955f..1bf5358 100644 --- a/aquatic_http/src/lib/glommio/mod.rs +++ b/aquatic_http/src/lib/glommio/mod.rs @@ -2,6 +2,8 @@ use glommio::prelude::*; use crate::config::Config; +mod network; + pub fn run( config: Config, ) -> anyhow::Result<()> { diff --git a/aquatic_http/src/lib/glommio/network.rs b/aquatic_http/src/lib/glommio/network.rs new file mode 100644 index 0000000..b1893eb --- /dev/null +++ b/aquatic_http/src/lib/glommio/network.rs @@ -0,0 +1,119 @@ +use std::io::{BufReader, Cursor, Read}; +use std::rc::Rc; +use std::sync::Arc; + +use aquatic_http_protocol::request::Request; +use futures_lite::{AsyncReadExt, StreamExt}; +use glommio::prelude::*; +use glommio::net::{TcpListener, TcpStream}; +use rustls::{IoState, ServerConnection}; + +use crate::config::Config; + +pub async fn run_socket_worker( + config: Config, +) { + let tlsConfig = Arc::new(create_tls_config(&config)); + let config = Rc::new(config); + + let listener = TcpListener::bind(config.network.address).expect("bind socket"); + + let mut incoming = listener.incoming(); + + while let Some(stream) = incoming.next().await { + match stream { + Ok(stream) => { + spawn_local(handle_stream(config.clone(), tlsConfig.clone(), stream)).detach(); + }, + Err(err) => { + ::log::error!("accept connection: {:?}", err); + } + } + + } +} + +async fn handle_stream( + config: Rc, + tlsConfig: Arc, + mut stream: TcpStream, +){ + let mut buf = [0u8; 1024]; + let mut conn = ServerConnection::new(tlsConfig).unwrap(); + + loop { + match stream.read(&mut buf).await { + Ok(ciphertext_bytes_read) => { + let mut cursor = Cursor::new(&buf[..ciphertext_bytes_read]); + + match conn.read_tls(&mut cursor) { + Ok(plaintext_bytes_read) => { + match conn.process_new_packets() { + Ok(_) => { + if ciphertext_bytes_read == 0 && plaintext_bytes_read == 0 { + let mut request_bytes = Vec::new(); + + conn.reader().read_to_end(&mut request_bytes); + + match Request::from_bytes(&request_bytes[..]) { + Ok(request) => { + + }, + Err(err) => { + // TODO: return error response, close connection + } + } + } + // TODO: check for io_state.peer_has_closed + }, + Err(err) => { + // TODO: call write_tls + ::log::info!("conn.process_new_packets: {:?}", err); + + break + } + } + }, + Err(err) => { + ::log::info!("conn.read_tls: {:?}", err); + } + } + }, + Err(err) => { + ::log::info!("stream.read: {:?}", err); + } + } + } +} + +fn create_tls_config( + config: &Config, +) -> rustls::ServerConfig { + let mut certs = Vec::new(); + let mut private_key = None; + + use std::iter; + use rustls_pemfile::{Item, read_one}; + + let pemfile = Vec::new(); + let mut reader = BufReader::new(&pemfile[..]); + + for item in iter::from_fn(|| read_one(&mut reader).transpose()) { + match item.unwrap() { + Item::X509Certificate(cert) => { + certs.push(rustls::Certificate(cert)); + }, + Item::RSAKey(key) | Item::PKCS8Key(key) => { + if private_key.is_none(){ + private_key = Some(rustls::PrivateKey(key)); + } + } + } + } + + rustls::ServerConfig::builder() + .with_safe_defaults() + .with_no_client_auth() + .with_single_cert(certs, private_key.expect("no private key")) + .expect("bad certificate/key") +} \ No newline at end of file