mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-04-01 18:25:30 +00:00
udp: add config key worker_channel_size to enable bounded channels
This commit is contained in:
parent
abea88dbf8
commit
2fe57b9f67
3 changed files with 33 additions and 8 deletions
|
|
@ -5,7 +5,7 @@ use std::sync::atomic::AtomicUsize;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
|
|
||||||
use crossbeam_channel::Sender;
|
use crossbeam_channel::{Sender, TrySendError};
|
||||||
|
|
||||||
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap};
|
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap};
|
||||||
use aquatic_common::AHashIndexMap;
|
use aquatic_common::AHashIndexMap;
|
||||||
|
|
@ -88,8 +88,14 @@ impl ConnectedRequestSender {
|
||||||
request: ConnectedRequest,
|
request: ConnectedRequest,
|
||||||
addr: SocketAddr,
|
addr: SocketAddr,
|
||||||
) {
|
) {
|
||||||
if let Err(err) = self.senders[index.0].try_send((self.index, request, addr)) {
|
match self.senders[index.0].try_send((self.index, request, addr)) {
|
||||||
::log::warn!("request_sender.try_send failed: {:?}", err)
|
Ok(()) => {},
|
||||||
|
Err(TrySendError::Full(_)) => {
|
||||||
|
::log::error!("Request channel {} is full, dropping request. Try increasing number of request workers or raising config.worker_channel_size.", index.0)
|
||||||
|
}
|
||||||
|
Err(TrySendError::Disconnected(_)) => {
|
||||||
|
panic!("Request channel {} is disconnected", index.0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -109,8 +115,14 @@ impl ConnectedResponseSender {
|
||||||
response: ConnectedResponse,
|
response: ConnectedResponse,
|
||||||
addr: SocketAddr,
|
addr: SocketAddr,
|
||||||
) {
|
) {
|
||||||
if let Err(err) = self.senders[index.0].try_send((response, addr)) {
|
match self.senders[index.0].try_send((response, addr)) {
|
||||||
::log::warn!("request_sender.try_send failed: {:?}", err)
|
Ok(()) => {},
|
||||||
|
Err(TrySendError::Full(_)) => {
|
||||||
|
::log::error!("Response channel {} is full, dropping response. Try increasing number of socket workers or raising config.worker_channel_size.", index.0)
|
||||||
|
}
|
||||||
|
Err(TrySendError::Disconnected(_)) => {
|
||||||
|
panic!("Response channel {} is disconnected", index.0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,10 @@ pub struct Config {
|
||||||
/// generate responses and send them back to the socket workers.
|
/// generate responses and send them back to the socket workers.
|
||||||
pub request_workers: usize,
|
pub request_workers: usize,
|
||||||
pub log_level: LogLevel,
|
pub log_level: LogLevel,
|
||||||
|
/// Maximum number of items in each channel passing requests/responses
|
||||||
|
/// between workers. A value of zero means that the channel will be of
|
||||||
|
/// unbounded size.
|
||||||
|
pub worker_channel_size: usize,
|
||||||
pub network: NetworkConfig,
|
pub network: NetworkConfig,
|
||||||
pub protocol: ProtocolConfig,
|
pub protocol: ProtocolConfig,
|
||||||
pub handlers: HandlerConfig,
|
pub handlers: HandlerConfig,
|
||||||
|
|
@ -33,6 +37,7 @@ impl Default for Config {
|
||||||
socket_workers: 1,
|
socket_workers: 1,
|
||||||
request_workers: 1,
|
request_workers: 1,
|
||||||
log_level: LogLevel::Error,
|
log_level: LogLevel::Error,
|
||||||
|
worker_channel_size: 0,
|
||||||
network: NetworkConfig::default(),
|
network: NetworkConfig::default(),
|
||||||
protocol: ProtocolConfig::default(),
|
protocol: ProtocolConfig::default(),
|
||||||
handlers: HandlerConfig::default(),
|
handlers: HandlerConfig::default(),
|
||||||
|
|
|
||||||
|
|
@ -12,7 +12,7 @@ use anyhow::Context;
|
||||||
#[cfg(feature = "cpu-pinning")]
|
#[cfg(feature = "cpu-pinning")]
|
||||||
use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex};
|
use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex};
|
||||||
use aquatic_common::privileges::drop_privileges_after_socket_binding;
|
use aquatic_common::privileges::drop_privileges_after_socket_binding;
|
||||||
use crossbeam_channel::unbounded;
|
use crossbeam_channel::{bounded, unbounded};
|
||||||
|
|
||||||
use aquatic_common::access_list::update_access_list;
|
use aquatic_common::access_list::update_access_list;
|
||||||
use signal_hook::consts::SIGUSR1;
|
use signal_hook::consts::SIGUSR1;
|
||||||
|
|
@ -40,14 +40,22 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
|
||||||
let mut response_receivers = BTreeMap::new();
|
let mut response_receivers = BTreeMap::new();
|
||||||
|
|
||||||
for i in 0..config.request_workers {
|
for i in 0..config.request_workers {
|
||||||
let (request_sender, request_receiver) = unbounded();
|
let (request_sender, request_receiver) = if config.worker_channel_size == 0 {
|
||||||
|
unbounded()
|
||||||
|
} else {
|
||||||
|
bounded(config.worker_channel_size)
|
||||||
|
};
|
||||||
|
|
||||||
request_senders.push(request_sender);
|
request_senders.push(request_sender);
|
||||||
request_receivers.insert(i, request_receiver);
|
request_receivers.insert(i, request_receiver);
|
||||||
}
|
}
|
||||||
|
|
||||||
for i in 0..config.socket_workers {
|
for i in 0..config.socket_workers {
|
||||||
let (response_sender, response_receiver) = unbounded();
|
let (response_sender, response_receiver) = if config.worker_channel_size == 0 {
|
||||||
|
unbounded()
|
||||||
|
} else {
|
||||||
|
bounded(config.worker_channel_size)
|
||||||
|
};
|
||||||
|
|
||||||
response_senders.push(response_sender);
|
response_senders.push(response_sender);
|
||||||
response_receivers.insert(i, response_receiver);
|
response_receivers.insert(i, response_receiver);
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue