diff --git a/aquatic_common/src/cpu_pinning.rs b/aquatic_common/src/cpu_pinning.rs index 57335f0..1358a1c 100644 --- a/aquatic_common/src/cpu_pinning.rs +++ b/aquatic_common/src/cpu_pinning.rs @@ -90,7 +90,7 @@ pub mod mod_name { #[derive(Clone, Copy, Debug)] pub enum WorkerIndex { SocketWorker(usize), - RequestWorker(usize), + SwarmWorker(usize), Util, } @@ -104,7 +104,7 @@ impl WorkerIndex { ) -> usize { let ascending_index = match self { Self::SocketWorker(index) => config.core_offset() + index, - Self::RequestWorker(index) => config.core_offset() + socket_workers + index, + Self::SwarmWorker(index) => config.core_offset() + socket_workers + index, Self::Util => config.core_offset() + socket_workers + request_workers, }; diff --git a/aquatic_http/src/lib.rs b/aquatic_http/src/lib.rs index 83f81dd..1faba55 100644 --- a/aquatic_http/src/lib.rs +++ b/aquatic_http/src/lib.rs @@ -91,7 +91,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { &config.cpu_pinning, config.socket_workers, config.request_workers, - WorkerIndex::RequestWorker(i), + WorkerIndex::SwarmWorker(i), )?; let builder = LocalExecutorBuilder::new(placement).name("request"); diff --git a/aquatic_udp/src/common.rs b/aquatic_udp/src/common.rs index 0bd6f11..8880dfe 100644 --- a/aquatic_udp/src/common.rs +++ b/aquatic_udp/src/common.rs @@ -43,11 +43,11 @@ pub enum ConnectedResponse { pub struct SocketWorkerIndex(pub usize); #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] -pub struct RequestWorkerIndex(pub usize); +pub struct SwarmWorkerIndex(pub usize); -impl RequestWorkerIndex { +impl SwarmWorkerIndex { pub fn from_info_hash(config: &Config, info_hash: InfoHash) -> Self { - Self(info_hash.0[0] as usize % config.request_workers) + Self(info_hash.0[0] as usize % config.swarm_workers) } } @@ -66,14 +66,14 @@ impl ConnectedRequestSender { pub fn try_send_to( &self, - index: RequestWorkerIndex, + index: SwarmWorkerIndex, request: ConnectedRequest, addr: CanonicalSocketAddr, ) { match self.senders[index.0].try_send((self.index, request, addr)) { Ok(()) => {} Err(TrySendError::Full(_)) => { - ::log::error!("Request channel {} is full, dropping request. Try increasing number of request workers or raising config.worker_channel_size.", index.0) + ::log::error!("Request channel {} is full, dropping request. Try increasing number of swarm workers or raising config.worker_channel_size.", index.0) } Err(TrySendError::Disconnected(_)) => { panic!("Request channel {} is disconnected", index.0); @@ -145,7 +145,7 @@ pub struct Statistics { } impl Statistics { - pub fn new(num_request_workers: usize) -> Self { + pub fn new(num_swarm_workers: usize) -> Self { Self { requests_received: Default::default(), responses_sent_connect: Default::default(), @@ -154,8 +154,8 @@ impl Statistics { responses_sent_error: Default::default(), bytes_received: Default::default(), bytes_sent: Default::default(), - torrents: Self::create_atomic_usize_vec(num_request_workers), - peers: Self::create_atomic_usize_vec(num_request_workers), + torrents: Self::create_atomic_usize_vec(num_swarm_workers), + peers: Self::create_atomic_usize_vec(num_swarm_workers), } } @@ -174,11 +174,11 @@ pub struct State { } impl State { - pub fn new(num_request_workers: usize) -> Self { + pub fn new(num_swarm_workers: usize) -> Self { Self { access_list: Arc::new(AccessListArcSwap::default()), - statistics_ipv4: Arc::new(Statistics::new(num_request_workers)), - statistics_ipv6: Arc::new(Statistics::new(num_request_workers)), + statistics_ipv4: Arc::new(Statistics::new(num_swarm_workers)), + statistics_ipv6: Arc::new(Statistics::new(num_swarm_workers)), } } } diff --git a/aquatic_udp/src/config.rs b/aquatic_udp/src/config.rs index 858a70a..eadcf5d 100644 --- a/aquatic_udp/src/config.rs +++ b/aquatic_udp/src/config.rs @@ -11,18 +11,18 @@ use aquatic_toml_config::TomlConfig; #[serde(default)] pub struct Config { /// Socket workers receive requests from the socket, parse them and send - /// them on to the request workers. They then receive responses from the - /// request workers, encode them and send them back over the socket. + /// them on to the swarm workers. They then receive responses from the + /// swarm workers, encode them and send them back over the socket. pub socket_workers: usize, - /// Request workers receive a number of requests from socket workers, + /// Swarm workers receive a number of requests from socket workers, /// generate responses and send them back to the socket workers. - pub request_workers: usize, + pub swarm_workers: usize, pub log_level: LogLevel, /// Maximum number of items in each channel passing requests/responses /// between workers. A value of zero means that the channel will be of /// unbounded size. pub worker_channel_size: usize, - /// How long to block waiting for requests in request workers. Higher + /// How long to block waiting for requests in swarm workers. Higher /// values means that with zero traffic, the worker will not unnecessarily /// cause the CPU to wake up as often. However, high values (something like /// larger than 1000) combined with very low traffic can cause delays @@ -42,7 +42,7 @@ impl Default for Config { fn default() -> Self { Self { socket_workers: 1, - request_workers: 1, + swarm_workers: 1, log_level: LogLevel::Error, worker_channel_size: 0, request_channel_recv_timeout_ms: 100, diff --git a/aquatic_udp/src/lib.rs b/aquatic_udp/src/lib.rs index aa66e11..815d69a 100644 --- a/aquatic_udp/src/lib.rs +++ b/aquatic_udp/src/lib.rs @@ -17,7 +17,7 @@ use aquatic_common::privileges::PrivilegeDropper; use aquatic_common::PanicSentinelWatcher; use common::{ - ConnectedRequestSender, ConnectedResponseSender, RequestWorkerIndex, SocketWorkerIndex, State, + ConnectedRequestSender, ConnectedResponseSender, SocketWorkerIndex, State, SwarmWorkerIndex, }; use config::Config; use workers::socket::validator::ConnectionValidator; @@ -28,7 +28,7 @@ pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION"); pub fn run(config: Config) -> ::anyhow::Result<()> { let mut signals = Signals::new([SIGUSR1, SIGTERM])?; - let state = State::new(config.request_workers); + let state = State::new(config.swarm_workers); let connection_validator = ConnectionValidator::new(&config)?; let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel(); let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); @@ -41,7 +41,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let mut response_senders = Vec::new(); let mut response_receivers = BTreeMap::new(); - for i in 0..config.request_workers { + for i in 0..config.swarm_workers { let (request_sender, request_receiver) = if config.worker_channel_size == 0 { unbounded() } else { @@ -63,7 +63,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { response_receivers.insert(i, response_receiver); } - for i in 0..config.request_workers { + for i in 0..config.swarm_workers { let sentinel = sentinel.clone(); let config = config.clone(); let state = state.clone(); @@ -71,26 +71,26 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let response_sender = ConnectedResponseSender::new(response_senders.clone()); Builder::new() - .name(format!("request-{:02}", i + 1)) + .name(format!("swarm-{:02}", i + 1)) .spawn(move || { #[cfg(feature = "cpu-pinning")] pin_current_if_configured_to( &config.cpu_pinning, config.socket_workers, - config.request_workers, - WorkerIndex::RequestWorker(i), + config.swarm_workers, + WorkerIndex::SwarmWorker(i), ); - workers::request::run_request_worker( + workers::swarm::run_swarm_worker( sentinel, config, state, request_receiver, response_sender, - RequestWorkerIndex(i), + SwarmWorkerIndex(i), ) }) - .with_context(|| "spawn request worker")?; + .with_context(|| "spawn swarm worker")?; } for i in 0..config.socket_workers { @@ -110,7 +110,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { pin_current_if_configured_to( &config.cpu_pinning, config.socket_workers, - config.request_workers, + config.swarm_workers, WorkerIndex::SocketWorker(i), ); @@ -140,7 +140,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { pin_current_if_configured_to( &config.cpu_pinning, config.socket_workers, - config.request_workers, + config.swarm_workers, WorkerIndex::Util, ); @@ -153,7 +153,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { pin_current_if_configured_to( &config.cpu_pinning, config.socket_workers, - config.request_workers, + config.swarm_workers, WorkerIndex::Util, ); diff --git a/aquatic_udp/src/workers/mod.rs b/aquatic_udp/src/workers/mod.rs index 668fd84..5446a1f 100644 --- a/aquatic_udp/src/workers/mod.rs +++ b/aquatic_udp/src/workers/mod.rs @@ -1,3 +1,3 @@ -pub mod request; pub mod socket; pub mod statistics; +pub mod swarm; diff --git a/aquatic_udp/src/workers/socket/requests.rs b/aquatic_udp/src/workers/socket/requests.rs index 779f080..eb995bd 100644 --- a/aquatic_udp/src/workers/socket/requests.rs +++ b/aquatic_udp/src/workers/socket/requests.rs @@ -127,8 +127,7 @@ fn handle_request( .load() .allows(access_list_mode, &request.info_hash.0) { - let worker_index = - RequestWorkerIndex::from_info_hash(config, request.info_hash); + let worker_index = SwarmWorkerIndex::from_info_hash(config, request.info_hash); request_sender.try_send_to( worker_index, @@ -153,9 +152,9 @@ fn handle_request( pending_scrape_valid_until, ); - for (request_worker_index, request) in split_requests { + for (swarm_worker_index, request) in split_requests { request_sender.try_send_to( - request_worker_index, + swarm_worker_index, ConnectedRequest::Scrape(request), src, ); diff --git a/aquatic_udp/src/workers/socket/storage.rs b/aquatic_udp/src/workers/socket/storage.rs index 4717daf..ee5ec54 100644 --- a/aquatic_udp/src/workers/socket/storage.rs +++ b/aquatic_udp/src/workers/socket/storage.rs @@ -27,9 +27,9 @@ impl PendingScrapeResponseSlab { config: &Config, request: ScrapeRequest, valid_until: ValidUntil, - ) -> impl IntoIterator { - let capacity = config.request_workers.min(request.info_hashes.len()); - let mut split_requests: HashMap = + ) -> impl IntoIterator { + let capacity = config.swarm_workers.min(request.info_hashes.len()); + let mut split_requests: HashMap = HashMap::with_capacity(capacity); if request.info_hashes.is_empty() { @@ -45,7 +45,7 @@ impl PendingScrapeResponseSlab { for (i, info_hash) in request.info_hashes.into_iter().enumerate() { let split_request = split_requests - .entry(RequestWorkerIndex::from_info_hash(&config, info_hash)) + .entry(SwarmWorkerIndex::from_info_hash(&config, info_hash)) .or_insert_with(|| PendingScrapeRequest { slab_key, info_hashes: BTreeMap::new(), @@ -128,15 +128,15 @@ mod tests { #[quickcheck] fn test_pending_scrape_response_slab( request_data: Vec<(i32, i64, u8)>, - request_workers: u8, + swarm_workers: u8, ) -> TestResult { - if request_workers == 0 { + if swarm_workers == 0 { return TestResult::discard(); } let mut config = Config::default(); - config.request_workers = request_workers as usize; + config.swarm_workers = swarm_workers as usize; let valid_until = ValidUntil::new(1); @@ -175,7 +175,7 @@ mod tests { all_split_requests.push( split_requests .into_iter() - .collect::>(), + .collect::>(), ); } @@ -185,7 +185,7 @@ mod tests { for split_requests in all_split_requests { for (worker_index, split_request) in split_requests { - assert!(worker_index.0 < request_workers as usize); + assert!(worker_index.0 < swarm_workers as usize); let torrent_stats = split_request .info_hashes diff --git a/aquatic_udp/src/workers/request/mod.rs b/aquatic_udp/src/workers/swarm/mod.rs similarity index 98% rename from aquatic_udp/src/workers/request/mod.rs rename to aquatic_udp/src/workers/swarm/mod.rs index 1b8335a..b2cc7ed 100644 --- a/aquatic_udp/src/workers/request/mod.rs +++ b/aquatic_udp/src/workers/swarm/mod.rs @@ -17,13 +17,13 @@ use crate::config::Config; use storage::{Peer, TorrentMap, TorrentMaps}; -pub fn run_request_worker( +pub fn run_swarm_worker( _sentinel: PanicSentinel, config: Config, state: State, request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>, response_sender: ConnectedResponseSender, - worker_index: RequestWorkerIndex, + worker_index: SwarmWorkerIndex, ) { let mut torrents = TorrentMaps::default(); let mut rng = SmallRng::from_entropy(); diff --git a/aquatic_udp/src/workers/request/storage.rs b/aquatic_udp/src/workers/swarm/storage.rs similarity index 100% rename from aquatic_udp/src/workers/request/storage.rs rename to aquatic_udp/src/workers/swarm/storage.rs diff --git a/aquatic_udp_bench/src/main.rs b/aquatic_udp_bench/src/main.rs index 89ca6da..3a7c256 100644 --- a/aquatic_udp_bench/src/main.rs +++ b/aquatic_udp_bench/src/main.rs @@ -8,7 +8,7 @@ //! ``` use aquatic_common::PanicSentinelWatcher; -use aquatic_udp::workers::request::run_request_worker; +use aquatic_udp::workers::swarm::run_swarm_worker; use crossbeam_channel::unbounded; use num_format::{Locale, ToFormattedString}; use rand::{rngs::SmallRng, thread_rng, Rng, SeedableRng}; @@ -53,16 +53,16 @@ pub fn run(bench_config: BenchConfig) -> ::anyhow::Result<()> { { let config = aquatic_config.clone(); - let state = State::new(config.request_workers); + let state = State::new(config.swarm_workers); ::std::thread::spawn(move || { - run_request_worker( + run_swarm_worker( sentinel, config, state, request_receiver, response_sender, - RequestWorkerIndex(0), + SwarmWorkerIndex(0), ) }); } diff --git a/aquatic_ws/src/lib.rs b/aquatic_ws/src/lib.rs index b6ffc22..e08b5c5 100644 --- a/aquatic_ws/src/lib.rs +++ b/aquatic_ws/src/lib.rs @@ -93,7 +93,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { &config.cpu_pinning, config.socket_workers, config.request_workers, - WorkerIndex::RequestWorker(i), + WorkerIndex::SwarmWorker(i), )?; let builder = LocalExecutorBuilder::new(placement).name("request");