WIP: aquatic_http glommio impl

This commit is contained in:
Joakim Frostegård 2021-10-26 16:26:37 +02:00
parent ad7e464788
commit 34bc4046b7
4 changed files with 195 additions and 2 deletions

View file

@ -17,7 +17,7 @@ path = "src/bin/main.rs"
[features]
default = ["with-mio"]
with-glommio = ["glommio", "futures-lite"]
with-glommio = ["glommio", "futures-lite", "rustls", "rustls-pemfile"]
with-mio = ["crossbeam-channel", "histogram", "mio", "native-tls", "socket2"]
[dependencies]
@ -47,8 +47,10 @@ native-tls = { version = "0.2", optional = true }
socket2 = { version = "0.4.1", features = ["all"], optional = true }
# glommio
glommio = { git = "https://github.com/DataDog/glommio.git", rev = "4e6b14772da2f4325271fbcf12d24cf91ed466e5", optional = true }
futures-lite = { version = "1", optional = true }
glommio = { git = "https://github.com/DataDog/glommio.git", rev = "4e6b14772da2f4325271fbcf12d24cf91ed466e5", optional = true }
rustls = { version = "0.20", optional = true }
rustls-pemfile = { version = "0.2", optional = true }
[dev-dependencies]
quickcheck = "1.0"

View file

@ -2,6 +2,8 @@ use glommio::prelude::*;
use crate::config::Config;
mod network;
pub fn run(
config: Config,
) -> anyhow::Result<()> {

View file

@ -0,0 +1,119 @@
use std::io::{BufReader, Cursor, Read};
use std::rc::Rc;
use std::sync::Arc;
use aquatic_http_protocol::request::Request;
use futures_lite::{AsyncReadExt, StreamExt};
use glommio::prelude::*;
use glommio::net::{TcpListener, TcpStream};
use rustls::{IoState, ServerConnection};
use crate::config::Config;
pub async fn run_socket_worker(
config: Config,
) {
let tlsConfig = Arc::new(create_tls_config(&config));
let config = Rc::new(config);
let listener = TcpListener::bind(config.network.address).expect("bind socket");
let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await {
match stream {
Ok(stream) => {
spawn_local(handle_stream(config.clone(), tlsConfig.clone(), stream)).detach();
},
Err(err) => {
::log::error!("accept connection: {:?}", err);
}
}
}
}
async fn handle_stream(
config: Rc<Config>,
tlsConfig: Arc<rustls::ServerConfig>,
mut stream: TcpStream,
){
let mut buf = [0u8; 1024];
let mut conn = ServerConnection::new(tlsConfig).unwrap();
loop {
match stream.read(&mut buf).await {
Ok(ciphertext_bytes_read) => {
let mut cursor = Cursor::new(&buf[..ciphertext_bytes_read]);
match conn.read_tls(&mut cursor) {
Ok(plaintext_bytes_read) => {
match conn.process_new_packets() {
Ok(_) => {
if ciphertext_bytes_read == 0 && plaintext_bytes_read == 0 {
let mut request_bytes = Vec::new();
conn.reader().read_to_end(&mut request_bytes);
match Request::from_bytes(&request_bytes[..]) {
Ok(request) => {
},
Err(err) => {
// TODO: return error response, close connection
}
}
}
// TODO: check for io_state.peer_has_closed
},
Err(err) => {
// TODO: call write_tls
::log::info!("conn.process_new_packets: {:?}", err);
break
}
}
},
Err(err) => {
::log::info!("conn.read_tls: {:?}", err);
}
}
},
Err(err) => {
::log::info!("stream.read: {:?}", err);
}
}
}
}
fn create_tls_config(
config: &Config,
) -> rustls::ServerConfig {
let mut certs = Vec::new();
let mut private_key = None;
use std::iter;
use rustls_pemfile::{Item, read_one};
let pemfile = Vec::new();
let mut reader = BufReader::new(&pemfile[..]);
for item in iter::from_fn(|| read_one(&mut reader).transpose()) {
match item.unwrap() {
Item::X509Certificate(cert) => {
certs.push(rustls::Certificate(cert));
},
Item::RSAKey(key) | Item::PKCS8Key(key) => {
if private_key.is_none(){
private_key = Some(rustls::PrivateKey(key));
}
}
}
}
rustls::ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
.with_single_cert(certs, private_key.expect("no private key"))
.expect("bad certificate/key")
}