diff --git a/Cargo.lock b/Cargo.lock index b12f8ba..5088125 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -62,8 +62,8 @@ dependencies = [ "aquatic_cli_helpers", "aquatic_common", "aquatic_http_protocol", + "crossbeam-channel", "either", - "flume", "hashbrown", "indexmap", "itoa", diff --git a/aquatic_http/Cargo.toml b/aquatic_http/Cargo.toml index fad01d0..4d8893a 100644 --- a/aquatic_http/Cargo.toml +++ b/aquatic_http/Cargo.toml @@ -18,8 +18,8 @@ anyhow = "1" aquatic_cli_helpers = { path = "../aquatic_cli_helpers" } aquatic_common = { path = "../aquatic_common" } aquatic_http_protocol = { path = "../aquatic_http_protocol" } +crossbeam-channel = "0.4" either = "1" -flume = "0.7" hashbrown = "0.8" indexmap = "1" itoa = "0.4" diff --git a/aquatic_http/src/lib/common.rs b/aquatic_http/src/lib/common.rs index ada603e..629dff7 100644 --- a/aquatic_http/src/lib/common.rs +++ b/aquatic_http/src/lib/common.rs @@ -2,7 +2,7 @@ use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; use std::sync::Arc; use either::Either; -use flume::{Sender, Receiver}; +use crossbeam_channel::{Sender, Receiver}; use hashbrown::HashMap; use indexmap::IndexMap; use log::error; diff --git a/aquatic_http/src/lib/handler.rs b/aquatic_http/src/lib/handler.rs index a4d9bb4..7300e80 100644 --- a/aquatic_http/src/lib/handler.rs +++ b/aquatic_http/src/lib/handler.rs @@ -15,7 +15,6 @@ use crate::common::*; use crate::config::Config; - pub fn run_request_worker( config: Config, state: State, @@ -101,6 +100,8 @@ pub fn handle_announce_requests( request_sender_meta.peer_addr.ip() ); + ::log::debug!("peer ip: {:?}", peer_ip); + let response = match peer_ip { IpAddr::V4(peer_ip_address) => { let torrent_data: &mut TorrentData = torrent_maps.ipv4 diff --git a/aquatic_http/src/lib/lib.rs b/aquatic_http/src/lib/lib.rs index a18cc02..10df8fb 100644 --- a/aquatic_http/src/lib/lib.rs +++ b/aquatic_http/src/lib/lib.rs @@ -22,7 +22,7 @@ pub fn run(config: Config) -> anyhow::Result<()> { let state = State::default(); - let (request_channel_sender, request_channel_receiver) = ::flume::unbounded(); + let (request_channel_sender, request_channel_receiver) = ::crossbeam_channel::unbounded(); let mut out_message_senders = Vec::new(); @@ -42,7 +42,7 @@ pub fn run(config: Config) -> anyhow::Result<()> { let request_channel_sender = request_channel_sender.clone(); let opt_tls_acceptor = opt_tls_acceptor.clone(); - let (response_channel_sender, response_channel_receiver) = ::flume::unbounded(); + let (response_channel_sender, response_channel_receiver) = ::crossbeam_channel::unbounded(); out_message_senders.push(response_channel_sender); diff --git a/aquatic_http/src/lib/network/mod.rs b/aquatic_http/src/lib/network/mod.rs index 6cc677f..8be5daa 100644 --- a/aquatic_http/src/lib/network/mod.rs +++ b/aquatic_http/src/lib/network/mod.rs @@ -22,6 +22,9 @@ use connection::*; use utils::*; +const CONNECTION_CLEAN_INTERVAL: usize = 2 ^ 22; + + pub fn run_socket_worker( config: Config, socket_worker_index: usize, @@ -108,18 +111,16 @@ pub fn run_poll_loop( } } - let response_drain = response_channel_receiver.drain(); - - if !(local_responses.is_empty() & (response_drain.len() == 0)) { + if !(local_responses.is_empty() & (response_channel_receiver.is_empty())) { send_responses( local_responses.drain(..), - response_drain, + response_channel_receiver.try_iter(), &mut connections ); } // Remove inactive connections, but not every iteration - if iter_counter % 32768 == 0 { + if iter_counter % CONNECTION_CLEAN_INTERVAL == 0 { remove_inactive_connections(&mut connections); } @@ -301,10 +302,10 @@ pub fn handle_connection_read_event( /// Read responses from channel, send to peers pub fn send_responses( local_responses: Drain<(ConnectionMeta, Response)>, - response_channel_receiver: ::flume::Drain<(ConnectionMeta, Response)>, + channel_responses: crossbeam_channel::TryIter<(ConnectionMeta, Response)>, connections: &mut ConnectionMap, ){ - for (meta, response) in local_responses.chain(response_channel_receiver){ + for (meta, response) in local_responses.chain(channel_responses){ if let Some(established) = connections.get_mut(&meta.poll_token) .and_then(Connection::get_established) {