aquatic_ws: mio: replace native_tls with rustls, rewrite connection state logic completely (#38)

* ws: mio: use rustls, rewrite Connection logic

* ws: mio: improve poll register/deregister handling

* ws: mio: work on type-level poll registry safety

* ws: mio: use stronger poll registry type-level guarantees

* ws: mio: fix stream reading

* ws: clean up, run fmt

* ws: mio: don't require registered connection for writing, improve docs

* ws: mio: add Connection::get_meta(), make Connection::meta private

* ws: mio: add ConnectionMap struct; remove utils.rs

* ws: mio: move token counter into ConnectionMap, improve docs

* ws: mio: connection: move Connection struct above state structs

* Update TODO

* ws: fix build errors

* ws: upgrade to tungstenite 0.16

* ws load test: don't panic on Close message; print shorter errors

* ws: fix socket worker bugs, add log statements

* ws: mio: wait for write availability if would block for ws messages

* Update README

* ws: mio: limit channels & queues; read 1 message only; other fixes

* ws: mio: send local responses each event; decrease channel size

* Update TODO

* ws: mio: limit ws send queue, fixing memory leak; limit pending messages

Also change some log output levels and run rustfmt

* Update TODO

* Update TODO
This commit is contained in:
Joakim Frostegård 2021-12-16 15:09:36 +00:00 committed by GitHub
parent 222fac0e09
commit 67c4c02bbd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 893 additions and 776 deletions

View file

@ -22,6 +22,7 @@ futures = "0.3"
futures-rustls = "0.22"
glommio = { git = "https://github.com/DataDog/glommio.git", rev = "2efe2f2a08f54394a435b674e8e0125057cbff03" }
hashbrown = { version = "0.11", features = ["serde"] }
log = "0.4"
mimalloc = { version = "0.1", default-features = false }
rand = { version = "0.8", features = ["small_rng"] }
rand_distr = "0.4"

View file

@ -105,7 +105,7 @@ impl Connection {
*num_active_connections.borrow_mut() += 1;
if let Err(err) = connection.run_connection_loop().await {
eprintln!("connection error: {:?}", err);
eprintln!("connection error: {}", err);
}
*num_active_connections.borrow_mut() -= 1;
@ -159,7 +159,26 @@ impl Connection {
}
async fn read_message(&mut self) -> anyhow::Result<()> {
match OutMessage::from_ws_message(self.stream.next().await.unwrap()?) {
let message = match self
.stream
.next()
.await
.ok_or_else(|| anyhow::anyhow!("stream finished"))??
{
message @ tungstenite::Message::Text(_) | message @ tungstenite::Message::Binary(_) => {
message
}
message => {
eprintln!(
"Received WebSocket message of unexpected type: {:?}",
message
);
return Ok(());
}
};
match OutMessage::from_ws_message(message) {
Ok(OutMessage::Offer(offer)) => {
self.load_test_state
.statistics
@ -205,7 +224,7 @@ impl Connection {
self.can_send = true;
}
Err(err) => {
eprintln!("error deserializing offer: {:?}", err);
eprintln!("error deserializing message: {:?}", err);
}
}