mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-03-31 17:55:36 +00:00
aquatic_ws: replace flume with crossbeam-channel
This improved performance in aquatic_http
This commit is contained in:
parent
0d8b6f6cc4
commit
d5de57b45f
5 changed files with 14 additions and 114 deletions
|
|
@ -17,8 +17,8 @@ path = "src/bin/main.rs"
|
|||
anyhow = "1"
|
||||
aquatic_cli_helpers = { path = "../aquatic_cli_helpers" }
|
||||
aquatic_common = { path = "../aquatic_common" }
|
||||
crossbeam-channel = "0.4"
|
||||
either = "1"
|
||||
flume = "0.7"
|
||||
hashbrown = { version = "0.8", features = ["serde"] }
|
||||
indexmap = "1"
|
||||
log = "0.4"
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use flume::{Sender, Receiver};
|
||||
use crossbeam_channel::{Sender, Receiver};
|
||||
use hashbrown::HashMap;
|
||||
use indexmap::IndexMap;
|
||||
use log::error;
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ pub fn run(config: Config) -> anyhow::Result<()> {
|
|||
|
||||
let state = State::default();
|
||||
|
||||
let (in_message_sender, in_message_receiver) = ::flume::unbounded();
|
||||
let (in_message_sender, in_message_receiver) = ::crossbeam_channel::unbounded();
|
||||
|
||||
let mut out_message_senders = Vec::new();
|
||||
|
||||
|
|
@ -45,7 +45,7 @@ pub fn run(config: Config) -> anyhow::Result<()> {
|
|||
let in_message_sender = in_message_sender.clone();
|
||||
let opt_tls_acceptor = opt_tls_acceptor.clone();
|
||||
|
||||
let (out_message_sender, out_message_receiver) = ::flume::unbounded();
|
||||
let (out_message_sender, out_message_receiver) = ::crossbeam_channel::unbounded();
|
||||
|
||||
out_message_senders.push(out_message_sender);
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
use std::time::Duration;
|
||||
use std::io::ErrorKind;
|
||||
|
||||
use crossbeam_channel::Receiver;
|
||||
use hashbrown::HashMap;
|
||||
use log::{info, debug, error};
|
||||
use native_tls::TlsAcceptor;
|
||||
|
|
@ -109,10 +110,12 @@ pub fn run_poll_loop(
|
|||
}
|
||||
}
|
||||
|
||||
send_out_messages(
|
||||
out_message_receiver.drain(),
|
||||
&mut connections
|
||||
);
|
||||
if !out_message_receiver.is_empty(){
|
||||
send_out_messages(
|
||||
&out_message_receiver,
|
||||
&mut connections
|
||||
);
|
||||
}
|
||||
|
||||
// Remove inactive connections, but not every iteration
|
||||
if iter_counter % 128 == 0 {
|
||||
|
|
@ -238,10 +241,10 @@ pub fn run_handshakes_and_read_messages(
|
|||
|
||||
/// Read messages from channel, send to peers
|
||||
pub fn send_out_messages(
|
||||
out_message_receiver: ::flume::Drain<(ConnectionMeta, OutMessage)>,
|
||||
out_message_receiver: &Receiver<(ConnectionMeta, OutMessage)>,
|
||||
connections: &mut ConnectionMap,
|
||||
){
|
||||
for (meta, out_message) in out_message_receiver {
|
||||
for (meta, out_message) in out_message_receiver.try_iter(){
|
||||
let opt_established_ws = connections.get_mut(&meta.poll_token)
|
||||
.and_then(Connection::get_established_ws);
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue