mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-04-01 10:15:31 +00:00
WIP: aquatic http glommio
This commit is contained in:
parent
34bc4046b7
commit
ef10c4f366
3 changed files with 112 additions and 50 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
|
@ -113,6 +113,7 @@ dependencies = [
|
||||||
"rustls",
|
"rustls",
|
||||||
"rustls-pemfile",
|
"rustls-pemfile",
|
||||||
"serde",
|
"serde",
|
||||||
|
"slab",
|
||||||
"smartstring",
|
"smartstring",
|
||||||
"socket2 0.4.2",
|
"socket2 0.4.2",
|
||||||
]
|
]
|
||||||
|
|
|
||||||
|
|
@ -17,7 +17,7 @@ path = "src/bin/main.rs"
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = ["with-mio"]
|
default = ["with-mio"]
|
||||||
with-glommio = ["glommio", "futures-lite", "rustls", "rustls-pemfile"]
|
with-glommio = ["glommio", "futures-lite", "rustls", "rustls-pemfile", "slab"]
|
||||||
with-mio = ["crossbeam-channel", "histogram", "mio", "native-tls", "socket2"]
|
with-mio = ["crossbeam-channel", "histogram", "mio", "native-tls", "socket2"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
|
@ -51,6 +51,7 @@ futures-lite = { version = "1", optional = true }
|
||||||
glommio = { git = "https://github.com/DataDog/glommio.git", rev = "4e6b14772da2f4325271fbcf12d24cf91ed466e5", optional = true }
|
glommio = { git = "https://github.com/DataDog/glommio.git", rev = "4e6b14772da2f4325271fbcf12d24cf91ed466e5", optional = true }
|
||||||
rustls = { version = "0.20", optional = true }
|
rustls = { version = "0.20", optional = true }
|
||||||
rustls-pemfile = { version = "0.2", optional = true }
|
rustls-pemfile = { version = "0.2", optional = true }
|
||||||
|
slab = { version = "0.4", optional = true }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
quickcheck = "1.0"
|
quickcheck = "1.0"
|
||||||
|
|
|
||||||
|
|
@ -3,27 +3,67 @@ use std::rc::Rc;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use aquatic_http_protocol::request::Request;
|
use aquatic_http_protocol::request::Request;
|
||||||
use futures_lite::{AsyncReadExt, StreamExt};
|
use aquatic_http_protocol::response::Response;
|
||||||
|
use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt};
|
||||||
use glommio::prelude::*;
|
use glommio::prelude::*;
|
||||||
use glommio::net::{TcpListener, TcpStream};
|
use glommio::net::{TcpListener, TcpStream};
|
||||||
|
use glommio::channels::local_channel::{new_bounded, LocalReceiver, LocalSender};
|
||||||
|
use glommio::task::JoinHandle;
|
||||||
use rustls::{IoState, ServerConnection};
|
use rustls::{IoState, ServerConnection};
|
||||||
|
use slab::Slab;
|
||||||
|
|
||||||
use crate::config::Config;
|
use crate::config::Config;
|
||||||
|
|
||||||
|
struct ConnectionReference {
|
||||||
|
response_sender: LocalSender<Response>,
|
||||||
|
handle: JoinHandle<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Connection {
|
||||||
|
response_receiver: LocalReceiver<Response>,
|
||||||
|
tls: ServerConnection,
|
||||||
|
stream: TcpStream,
|
||||||
|
index: usize,
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn run_socket_worker(
|
pub async fn run_socket_worker(
|
||||||
config: Config,
|
config: Config,
|
||||||
) {
|
) {
|
||||||
let tlsConfig = Arc::new(create_tls_config(&config));
|
let tls_config = Arc::new(create_tls_config(&config));
|
||||||
let config = Rc::new(config);
|
let config = Rc::new(config);
|
||||||
|
|
||||||
let listener = TcpListener::bind(config.network.address).expect("bind socket");
|
let listener = TcpListener::bind(config.network.address).expect("bind socket");
|
||||||
|
|
||||||
|
let mut connection_slab: Slab<ConnectionReference> = Slab::new();
|
||||||
|
|
||||||
let mut incoming = listener.incoming();
|
let mut incoming = listener.incoming();
|
||||||
|
|
||||||
while let Some(stream) = incoming.next().await {
|
while let Some(stream) = incoming.next().await {
|
||||||
match stream {
|
match stream {
|
||||||
Ok(stream) => {
|
Ok(stream) => {
|
||||||
spawn_local(handle_stream(config.clone(), tlsConfig.clone(), stream)).detach();
|
let (response_sender, response_receiver) = new_bounded(1);
|
||||||
|
|
||||||
|
let entry = connection_slab.vacant_entry();
|
||||||
|
|
||||||
|
let conn = Connection {
|
||||||
|
response_receiver,
|
||||||
|
tls: ServerConnection::new(tls_config.clone()).unwrap(),
|
||||||
|
stream,
|
||||||
|
index: entry.key(),
|
||||||
|
};
|
||||||
|
|
||||||
|
async fn handle_stream(mut conn: Connection) {
|
||||||
|
conn.handle_stream().await;
|
||||||
|
}
|
||||||
|
|
||||||
|
let handle = spawn_local(handle_stream(conn)).detach();
|
||||||
|
|
||||||
|
let connection_reference = ConnectionReference {
|
||||||
|
response_sender,
|
||||||
|
handle,
|
||||||
|
};
|
||||||
|
|
||||||
|
entry.insert(connection_reference);
|
||||||
},
|
},
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
::log::error!("accept connection: {:?}", err);
|
::log::error!("accept connection: {:?}", err);
|
||||||
|
|
@ -33,59 +73,79 @@ pub async fn run_socket_worker(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_stream(
|
impl Connection {
|
||||||
config: Rc<Config>,
|
async fn handle_stream(&mut self){
|
||||||
tlsConfig: Arc<rustls::ServerConfig>,
|
loop {
|
||||||
mut stream: TcpStream,
|
while let Some(response) = self.response_receiver.stream().next().await {
|
||||||
){
|
response.write(&mut self.tls.writer()).unwrap();
|
||||||
let mut buf = [0u8; 1024];
|
|
||||||
let mut conn = ServerConnection::new(tlsConfig).unwrap();
|
|
||||||
|
|
||||||
loop {
|
let mut buf = Vec::new();
|
||||||
match stream.read(&mut buf).await {
|
let mut buf = Cursor::new(&mut buf);
|
||||||
Ok(ciphertext_bytes_read) => {
|
|
||||||
let mut cursor = Cursor::new(&buf[..ciphertext_bytes_read]);
|
|
||||||
|
|
||||||
match conn.read_tls(&mut cursor) {
|
while self.tls.wants_write() {
|
||||||
Ok(plaintext_bytes_read) => {
|
self.tls.write_tls(&mut buf).unwrap();
|
||||||
match conn.process_new_packets() {
|
|
||||||
Ok(_) => {
|
|
||||||
if ciphertext_bytes_read == 0 && plaintext_bytes_read == 0 {
|
|
||||||
let mut request_bytes = Vec::new();
|
|
||||||
|
|
||||||
conn.reader().read_to_end(&mut request_bytes);
|
|
||||||
|
|
||||||
match Request::from_bytes(&request_bytes[..]) {
|
|
||||||
Ok(request) => {
|
|
||||||
|
|
||||||
},
|
|
||||||
Err(err) => {
|
|
||||||
// TODO: return error response, close connection
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// TODO: check for io_state.peer_has_closed
|
|
||||||
},
|
|
||||||
Err(err) => {
|
|
||||||
// TODO: call write_tls
|
|
||||||
::log::info!("conn.process_new_packets: {:?}", err);
|
|
||||||
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Err(err) => {
|
|
||||||
::log::info!("conn.read_tls: {:?}", err);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
},
|
|
||||||
Err(err) => {
|
self.stream.write_all(&buf.into_inner()).await.unwrap();
|
||||||
::log::info!("stream.read: {:?}", err);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
|
async fn handle_stream_handshake(&mut self) {
|
||||||
|
let mut buf = [0u8; 1024];
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match self.stream.read(&mut buf).await {
|
||||||
|
Ok(ciphertext_bytes_read) => {
|
||||||
|
let mut cursor = Cursor::new(&buf[..ciphertext_bytes_read]);
|
||||||
|
|
||||||
|
match self.tls.read_tls(&mut cursor) {
|
||||||
|
Ok(plaintext_bytes_read) => {
|
||||||
|
match self.tls.process_new_packets() {
|
||||||
|
Ok(_) => {
|
||||||
|
if ciphertext_bytes_read == 0 && plaintext_bytes_read == 0 {
|
||||||
|
let mut request_bytes = Vec::new();
|
||||||
|
|
||||||
|
self.tls.reader().read_to_end(&mut request_bytes);
|
||||||
|
|
||||||
|
match Request::from_bytes(&request_bytes[..]) {
|
||||||
|
Ok(request) => {
|
||||||
|
::log::info!("request read: {:?}", request);
|
||||||
|
},
|
||||||
|
Err(err) => {
|
||||||
|
// TODO: send error response, close connection
|
||||||
|
|
||||||
|
::log::info!("Request::from_bytes: {:?}", err);
|
||||||
|
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// TODO: check for io_state.peer_has_closed
|
||||||
|
},
|
||||||
|
Err(err) => {
|
||||||
|
// TODO: call write_tls
|
||||||
|
::log::info!("conn.process_new_packets: {:?}", err);
|
||||||
|
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(err) => {
|
||||||
|
::log::info!("conn.read_tls: {:?}", err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(err) => {
|
||||||
|
::log::info!("stream.read: {:?}", err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
fn create_tls_config(
|
fn create_tls_config(
|
||||||
config: &Config,
|
config: &Config,
|
||||||
) -> rustls::ServerConfig {
|
) -> rustls::ServerConfig {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue