From d5de57b45f0b41fc7054c5c1d15dca3043827d03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Fri, 31 Jul 2020 01:47:45 +0200 Subject: [PATCH] aquatic_ws: replace flume with crossbeam-channel This improved performance in aquatic_http --- Cargo.lock | 105 +----------------------------- aquatic_ws/Cargo.toml | 2 +- aquatic_ws/src/lib/common.rs | 2 +- aquatic_ws/src/lib/lib.rs | 4 +- aquatic_ws/src/lib/network/mod.rs | 15 +++-- 5 files changed, 14 insertions(+), 114 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5088125..b9f660e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -195,8 +195,8 @@ dependencies = [ "anyhow", "aquatic_cli_helpers", "aquatic_common", + "crossbeam-channel", "either", - "flume", "hashbrown", "indexmap", "log", @@ -647,16 +647,6 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" -[[package]] -name = "flume" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "855e285c3835897065a6ba6f9463b44553eb9f29c7988d692f3d41283b47388b" -dependencies = [ - "futures", - "spin", -] - [[package]] name = "fnv" version = "1.0.7" @@ -678,67 +668,6 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" -[[package]] -name = "futures" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e05b85ec287aac0dc34db7d4a569323df697f9c55b99b15d6b4ef8cde49f613" -dependencies = [ - "futures-channel", - "futures-core", - "futures-io", - "futures-sink", - "futures-task", - "futures-util", -] - -[[package]] -name = "futures-channel" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f366ad74c28cca6ba456d95e6422883cfb4b252a83bed929c83abfdbbf2967d5" -dependencies = [ - "futures-core", - "futures-sink", -] - -[[package]] -name = "futures-core" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59f5fff90fd5d971f936ad674802482ba441b6f09ba5e15fd8b39145582ca399" - -[[package]] -name = "futures-io" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de27142b013a8e869c14957e6d2edeef89e97c289e69d042ee3a49acd8b51789" - -[[package]] -name = "futures-sink" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f2032893cb734c7a05d85ce0cc8b8c4075278e93b24b66f9de99d6eb0fa8acc" - -[[package]] -name = "futures-task" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bdb66b5f09e22019b1ab0830f7785bcea8e7a42148683f99214f73f8ec21a626" - -[[package]] -name = "futures-util" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8764574ff08b701a084482c3c7031349104b07ac897393010494beaa18ce32c6" -dependencies = [ - "futures-core", - "futures-sink", - "futures-task", - "pin-project", - "pin-utils", -] - [[package]] name = "generic-array" version = "0.12.3" @@ -1268,32 +1197,6 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" -[[package]] -name = "pin-project" -version = "0.4.22" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12e3a6cdbfe94a5e4572812a0201f8c0ed98c1c452c7b8563ce2276988ef9c17" -dependencies = [ - "pin-project-internal", -] - -[[package]] -name = "pin-project-internal" -version = "0.4.22" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a0ffd45cf79d88737d7cc85bfd5d2894bee1139b356e616fe85dc389c61aaf7" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "pin-utils" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" - [[package]] name = "pkg-config" version = "0.3.18" @@ -1709,12 +1612,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "spin" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" - [[package]] name = "static_assertions" version = "1.1.0" diff --git a/aquatic_ws/Cargo.toml b/aquatic_ws/Cargo.toml index 10bb580..5eed980 100644 --- a/aquatic_ws/Cargo.toml +++ b/aquatic_ws/Cargo.toml @@ -17,8 +17,8 @@ path = "src/bin/main.rs" anyhow = "1" aquatic_cli_helpers = { path = "../aquatic_cli_helpers" } aquatic_common = { path = "../aquatic_common" } +crossbeam-channel = "0.4" either = "1" -flume = "0.7" hashbrown = { version = "0.8", features = ["serde"] } indexmap = "1" log = "0.4" diff --git a/aquatic_ws/src/lib/common.rs b/aquatic_ws/src/lib/common.rs index 73e24f7..f631f3e 100644 --- a/aquatic_ws/src/lib/common.rs +++ b/aquatic_ws/src/lib/common.rs @@ -1,7 +1,7 @@ use std::net::SocketAddr; use std::sync::Arc; -use flume::{Sender, Receiver}; +use crossbeam_channel::{Sender, Receiver}; use hashbrown::HashMap; use indexmap::IndexMap; use log::error; diff --git a/aquatic_ws/src/lib/lib.rs b/aquatic_ws/src/lib/lib.rs index f656f1b..e21a77d 100644 --- a/aquatic_ws/src/lib/lib.rs +++ b/aquatic_ws/src/lib/lib.rs @@ -25,7 +25,7 @@ pub fn run(config: Config) -> anyhow::Result<()> { let state = State::default(); - let (in_message_sender, in_message_receiver) = ::flume::unbounded(); + let (in_message_sender, in_message_receiver) = ::crossbeam_channel::unbounded(); let mut out_message_senders = Vec::new(); @@ -45,7 +45,7 @@ pub fn run(config: Config) -> anyhow::Result<()> { let in_message_sender = in_message_sender.clone(); let opt_tls_acceptor = opt_tls_acceptor.clone(); - let (out_message_sender, out_message_receiver) = ::flume::unbounded(); + let (out_message_sender, out_message_receiver) = ::crossbeam_channel::unbounded(); out_message_senders.push(out_message_sender); diff --git a/aquatic_ws/src/lib/network/mod.rs b/aquatic_ws/src/lib/network/mod.rs index d2f41ed..4bc159b 100644 --- a/aquatic_ws/src/lib/network/mod.rs +++ b/aquatic_ws/src/lib/network/mod.rs @@ -1,6 +1,7 @@ use std::time::Duration; use std::io::ErrorKind; +use crossbeam_channel::Receiver; use hashbrown::HashMap; use log::{info, debug, error}; use native_tls::TlsAcceptor; @@ -109,10 +110,12 @@ pub fn run_poll_loop( } } - send_out_messages( - out_message_receiver.drain(), - &mut connections - ); + if !out_message_receiver.is_empty(){ + send_out_messages( + &out_message_receiver, + &mut connections + ); + } // Remove inactive connections, but not every iteration if iter_counter % 128 == 0 { @@ -238,10 +241,10 @@ pub fn run_handshakes_and_read_messages( /// Read messages from channel, send to peers pub fn send_out_messages( - out_message_receiver: ::flume::Drain<(ConnectionMeta, OutMessage)>, + out_message_receiver: &Receiver<(ConnectionMeta, OutMessage)>, connections: &mut ConnectionMap, ){ - for (meta, out_message) in out_message_receiver { + for (meta, out_message) in out_message_receiver.try_iter(){ let opt_established_ws = connections.get_mut(&meta.poll_token) .and_then(Connection::get_established_ws);