aquatic_http: replace flume with crossbeam; clean connections less often

This commit is contained in:
Joakim Frostegård 2020-07-24 20:36:46 +02:00
parent 3802dec79e
commit 228511b3aa
6 changed files with 15 additions and 13 deletions

2
Cargo.lock generated
View file

@ -62,8 +62,8 @@ dependencies = [
"aquatic_cli_helpers", "aquatic_cli_helpers",
"aquatic_common", "aquatic_common",
"aquatic_http_protocol", "aquatic_http_protocol",
"crossbeam-channel",
"either", "either",
"flume",
"hashbrown", "hashbrown",
"indexmap", "indexmap",
"itoa", "itoa",

View file

@ -18,8 +18,8 @@ anyhow = "1"
aquatic_cli_helpers = { path = "../aquatic_cli_helpers" } aquatic_cli_helpers = { path = "../aquatic_cli_helpers" }
aquatic_common = { path = "../aquatic_common" } aquatic_common = { path = "../aquatic_common" }
aquatic_http_protocol = { path = "../aquatic_http_protocol" } aquatic_http_protocol = { path = "../aquatic_http_protocol" }
crossbeam-channel = "0.4"
either = "1" either = "1"
flume = "0.7"
hashbrown = "0.8" hashbrown = "0.8"
indexmap = "1" indexmap = "1"
itoa = "0.4" itoa = "0.4"

View file

@ -2,7 +2,7 @@ use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
use std::sync::Arc; use std::sync::Arc;
use either::Either; use either::Either;
use flume::{Sender, Receiver}; use crossbeam_channel::{Sender, Receiver};
use hashbrown::HashMap; use hashbrown::HashMap;
use indexmap::IndexMap; use indexmap::IndexMap;
use log::error; use log::error;

View file

@ -15,7 +15,6 @@ use crate::common::*;
use crate::config::Config; use crate::config::Config;
pub fn run_request_worker( pub fn run_request_worker(
config: Config, config: Config,
state: State, state: State,
@ -101,6 +100,8 @@ pub fn handle_announce_requests(
request_sender_meta.peer_addr.ip() request_sender_meta.peer_addr.ip()
); );
::log::debug!("peer ip: {:?}", peer_ip);
let response = match peer_ip { let response = match peer_ip {
IpAddr::V4(peer_ip_address) => { IpAddr::V4(peer_ip_address) => {
let torrent_data: &mut TorrentData<Ipv4Addr> = torrent_maps.ipv4 let torrent_data: &mut TorrentData<Ipv4Addr> = torrent_maps.ipv4

View file

@ -22,7 +22,7 @@ pub fn run(config: Config) -> anyhow::Result<()> {
let state = State::default(); let state = State::default();
let (request_channel_sender, request_channel_receiver) = ::flume::unbounded(); let (request_channel_sender, request_channel_receiver) = ::crossbeam_channel::unbounded();
let mut out_message_senders = Vec::new(); let mut out_message_senders = Vec::new();
@ -42,7 +42,7 @@ pub fn run(config: Config) -> anyhow::Result<()> {
let request_channel_sender = request_channel_sender.clone(); let request_channel_sender = request_channel_sender.clone();
let opt_tls_acceptor = opt_tls_acceptor.clone(); let opt_tls_acceptor = opt_tls_acceptor.clone();
let (response_channel_sender, response_channel_receiver) = ::flume::unbounded(); let (response_channel_sender, response_channel_receiver) = ::crossbeam_channel::unbounded();
out_message_senders.push(response_channel_sender); out_message_senders.push(response_channel_sender);

View file

@ -22,6 +22,9 @@ use connection::*;
use utils::*; use utils::*;
const CONNECTION_CLEAN_INTERVAL: usize = 2 ^ 22;
pub fn run_socket_worker( pub fn run_socket_worker(
config: Config, config: Config,
socket_worker_index: usize, socket_worker_index: usize,
@ -108,18 +111,16 @@ pub fn run_poll_loop(
} }
} }
let response_drain = response_channel_receiver.drain(); if !(local_responses.is_empty() & (response_channel_receiver.is_empty())) {
if !(local_responses.is_empty() & (response_drain.len() == 0)) {
send_responses( send_responses(
local_responses.drain(..), local_responses.drain(..),
response_drain, response_channel_receiver.try_iter(),
&mut connections &mut connections
); );
} }
// Remove inactive connections, but not every iteration // Remove inactive connections, but not every iteration
if iter_counter % 32768 == 0 { if iter_counter % CONNECTION_CLEAN_INTERVAL == 0 {
remove_inactive_connections(&mut connections); remove_inactive_connections(&mut connections);
} }
@ -301,10 +302,10 @@ pub fn handle_connection_read_event(
/// Read responses from channel, send to peers /// Read responses from channel, send to peers
pub fn send_responses( pub fn send_responses(
local_responses: Drain<(ConnectionMeta, Response)>, local_responses: Drain<(ConnectionMeta, Response)>,
response_channel_receiver: ::flume::Drain<(ConnectionMeta, Response)>, channel_responses: crossbeam_channel::TryIter<(ConnectionMeta, Response)>,
connections: &mut ConnectionMap, connections: &mut ConnectionMap,
){ ){
for (meta, response) in local_responses.chain(response_channel_receiver){ for (meta, response) in local_responses.chain(channel_responses){
if let Some(established) = connections.get_mut(&meta.poll_token) if let Some(established) = connections.get_mut(&meta.poll_token)
.and_then(Connection::get_established) .and_then(Connection::get_established)
{ {