From 2fe57b9f67eba71da141688c591568fd7baee33d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 28 Nov 2021 21:59:11 +0100 Subject: [PATCH] udp: add config key worker_channel_size to enable bounded channels --- aquatic_udp/src/common.rs | 22 +++++++++++++++++----- aquatic_udp/src/config.rs | 5 +++++ aquatic_udp/src/lib.rs | 14 +++++++++++--- 3 files changed, 33 insertions(+), 8 deletions(-) diff --git a/aquatic_udp/src/common.rs b/aquatic_udp/src/common.rs index b0f9f2e..085a82a 100644 --- a/aquatic_udp/src/common.rs +++ b/aquatic_udp/src/common.rs @@ -5,7 +5,7 @@ use std::sync::atomic::AtomicUsize; use std::sync::Arc; use std::time::Instant; -use crossbeam_channel::Sender; +use crossbeam_channel::{Sender, TrySendError}; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap}; use aquatic_common::AHashIndexMap; @@ -88,8 +88,14 @@ impl ConnectedRequestSender { request: ConnectedRequest, addr: SocketAddr, ) { - if let Err(err) = self.senders[index.0].try_send((self.index, request, addr)) { - ::log::warn!("request_sender.try_send failed: {:?}", err) + match self.senders[index.0].try_send((self.index, request, addr)) { + Ok(()) => {}, + Err(TrySendError::Full(_)) => { + ::log::error!("Request channel {} is full, dropping request. Try increasing number of request workers or raising config.worker_channel_size.", index.0) + } + Err(TrySendError::Disconnected(_)) => { + panic!("Request channel {} is disconnected", index.0); + } } } } @@ -109,8 +115,14 @@ impl ConnectedResponseSender { response: ConnectedResponse, addr: SocketAddr, ) { - if let Err(err) = self.senders[index.0].try_send((response, addr)) { - ::log::warn!("request_sender.try_send failed: {:?}", err) + match self.senders[index.0].try_send((response, addr)) { + Ok(()) => {}, + Err(TrySendError::Full(_)) => { + ::log::error!("Response channel {} is full, dropping response. Try increasing number of socket workers or raising config.worker_channel_size.", index.0) + } + Err(TrySendError::Disconnected(_)) => { + panic!("Response channel {} is disconnected", index.0); + } } } } diff --git a/aquatic_udp/src/config.rs b/aquatic_udp/src/config.rs index 35f2e0a..721a43a 100644 --- a/aquatic_udp/src/config.rs +++ b/aquatic_udp/src/config.rs @@ -16,6 +16,10 @@ pub struct Config { /// generate responses and send them back to the socket workers. pub request_workers: usize, pub log_level: LogLevel, + /// Maximum number of items in each channel passing requests/responses + /// between workers. A value of zero means that the channel will be of + /// unbounded size. + pub worker_channel_size: usize, pub network: NetworkConfig, pub protocol: ProtocolConfig, pub handlers: HandlerConfig, @@ -33,6 +37,7 @@ impl Default for Config { socket_workers: 1, request_workers: 1, log_level: LogLevel::Error, + worker_channel_size: 0, network: NetworkConfig::default(), protocol: ProtocolConfig::default(), handlers: HandlerConfig::default(), diff --git a/aquatic_udp/src/lib.rs b/aquatic_udp/src/lib.rs index 1e7e604..3c77751 100644 --- a/aquatic_udp/src/lib.rs +++ b/aquatic_udp/src/lib.rs @@ -12,7 +12,7 @@ use anyhow::Context; #[cfg(feature = "cpu-pinning")] use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; use aquatic_common::privileges::drop_privileges_after_socket_binding; -use crossbeam_channel::unbounded; +use crossbeam_channel::{bounded, unbounded}; use aquatic_common::access_list::update_access_list; use signal_hook::consts::SIGUSR1; @@ -40,14 +40,22 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let mut response_receivers = BTreeMap::new(); for i in 0..config.request_workers { - let (request_sender, request_receiver) = unbounded(); + let (request_sender, request_receiver) = if config.worker_channel_size == 0 { + unbounded() + } else { + bounded(config.worker_channel_size) + }; request_senders.push(request_sender); request_receivers.insert(i, request_receiver); } for i in 0..config.socket_workers { - let (response_sender, response_receiver) = unbounded(); + let (response_sender, response_receiver) = if config.worker_channel_size == 0 { + unbounded() + } else { + bounded(config.worker_channel_size) + }; response_senders.push(response_sender); response_receivers.insert(i, response_receiver);