From fb2794643d358756095f017875d2d19581e89629 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 4 Jul 2022 11:49:39 +0200 Subject: [PATCH 1/7] udp, common: rename request workers to swarm workers --- aquatic_common/src/cpu_pinning.rs | 4 +-- aquatic_http/src/lib.rs | 2 +- aquatic_udp/src/common.rs | 22 ++++++++-------- aquatic_udp/src/config.rs | 12 ++++----- aquatic_udp/src/lib.rs | 26 +++++++++---------- aquatic_udp/src/workers/mod.rs | 2 +- aquatic_udp/src/workers/socket/requests.rs | 7 +++-- aquatic_udp/src/workers/socket/storage.rs | 18 ++++++------- .../src/workers/{request => swarm}/mod.rs | 4 +-- .../src/workers/{request => swarm}/storage.rs | 0 aquatic_udp_bench/src/main.rs | 8 +++--- aquatic_ws/src/lib.rs | 2 +- 12 files changed, 53 insertions(+), 54 deletions(-) rename aquatic_udp/src/workers/{request => swarm}/mod.rs (98%) rename aquatic_udp/src/workers/{request => swarm}/storage.rs (100%) diff --git a/aquatic_common/src/cpu_pinning.rs b/aquatic_common/src/cpu_pinning.rs index 57335f0..1358a1c 100644 --- a/aquatic_common/src/cpu_pinning.rs +++ b/aquatic_common/src/cpu_pinning.rs @@ -90,7 +90,7 @@ pub mod mod_name { #[derive(Clone, Copy, Debug)] pub enum WorkerIndex { SocketWorker(usize), - RequestWorker(usize), + SwarmWorker(usize), Util, } @@ -104,7 +104,7 @@ impl WorkerIndex { ) -> usize { let ascending_index = match self { Self::SocketWorker(index) => config.core_offset() + index, - Self::RequestWorker(index) => config.core_offset() + socket_workers + index, + Self::SwarmWorker(index) => config.core_offset() + socket_workers + index, Self::Util => config.core_offset() + socket_workers + request_workers, }; diff --git a/aquatic_http/src/lib.rs b/aquatic_http/src/lib.rs index 83f81dd..1faba55 100644 --- a/aquatic_http/src/lib.rs +++ b/aquatic_http/src/lib.rs @@ -91,7 +91,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { &config.cpu_pinning, config.socket_workers, config.request_workers, - WorkerIndex::RequestWorker(i), + WorkerIndex::SwarmWorker(i), )?; let builder = LocalExecutorBuilder::new(placement).name("request"); diff --git a/aquatic_udp/src/common.rs b/aquatic_udp/src/common.rs index 0bd6f11..8880dfe 100644 --- a/aquatic_udp/src/common.rs +++ b/aquatic_udp/src/common.rs @@ -43,11 +43,11 @@ pub enum ConnectedResponse { pub struct SocketWorkerIndex(pub usize); #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] -pub struct RequestWorkerIndex(pub usize); +pub struct SwarmWorkerIndex(pub usize); -impl RequestWorkerIndex { +impl SwarmWorkerIndex { pub fn from_info_hash(config: &Config, info_hash: InfoHash) -> Self { - Self(info_hash.0[0] as usize % config.request_workers) + Self(info_hash.0[0] as usize % config.swarm_workers) } } @@ -66,14 +66,14 @@ impl ConnectedRequestSender { pub fn try_send_to( &self, - index: RequestWorkerIndex, + index: SwarmWorkerIndex, request: ConnectedRequest, addr: CanonicalSocketAddr, ) { match self.senders[index.0].try_send((self.index, request, addr)) { Ok(()) => {} Err(TrySendError::Full(_)) => { - ::log::error!("Request channel {} is full, dropping request. Try increasing number of request workers or raising config.worker_channel_size.", index.0) + ::log::error!("Request channel {} is full, dropping request. Try increasing number of swarm workers or raising config.worker_channel_size.", index.0) } Err(TrySendError::Disconnected(_)) => { panic!("Request channel {} is disconnected", index.0); @@ -145,7 +145,7 @@ pub struct Statistics { } impl Statistics { - pub fn new(num_request_workers: usize) -> Self { + pub fn new(num_swarm_workers: usize) -> Self { Self { requests_received: Default::default(), responses_sent_connect: Default::default(), @@ -154,8 +154,8 @@ impl Statistics { responses_sent_error: Default::default(), bytes_received: Default::default(), bytes_sent: Default::default(), - torrents: Self::create_atomic_usize_vec(num_request_workers), - peers: Self::create_atomic_usize_vec(num_request_workers), + torrents: Self::create_atomic_usize_vec(num_swarm_workers), + peers: Self::create_atomic_usize_vec(num_swarm_workers), } } @@ -174,11 +174,11 @@ pub struct State { } impl State { - pub fn new(num_request_workers: usize) -> Self { + pub fn new(num_swarm_workers: usize) -> Self { Self { access_list: Arc::new(AccessListArcSwap::default()), - statistics_ipv4: Arc::new(Statistics::new(num_request_workers)), - statistics_ipv6: Arc::new(Statistics::new(num_request_workers)), + statistics_ipv4: Arc::new(Statistics::new(num_swarm_workers)), + statistics_ipv6: Arc::new(Statistics::new(num_swarm_workers)), } } } diff --git a/aquatic_udp/src/config.rs b/aquatic_udp/src/config.rs index 858a70a..eadcf5d 100644 --- a/aquatic_udp/src/config.rs +++ b/aquatic_udp/src/config.rs @@ -11,18 +11,18 @@ use aquatic_toml_config::TomlConfig; #[serde(default)] pub struct Config { /// Socket workers receive requests from the socket, parse them and send - /// them on to the request workers. They then receive responses from the - /// request workers, encode them and send them back over the socket. + /// them on to the swarm workers. They then receive responses from the + /// swarm workers, encode them and send them back over the socket. pub socket_workers: usize, - /// Request workers receive a number of requests from socket workers, + /// Swarm workers receive a number of requests from socket workers, /// generate responses and send them back to the socket workers. - pub request_workers: usize, + pub swarm_workers: usize, pub log_level: LogLevel, /// Maximum number of items in each channel passing requests/responses /// between workers. A value of zero means that the channel will be of /// unbounded size. pub worker_channel_size: usize, - /// How long to block waiting for requests in request workers. Higher + /// How long to block waiting for requests in swarm workers. Higher /// values means that with zero traffic, the worker will not unnecessarily /// cause the CPU to wake up as often. However, high values (something like /// larger than 1000) combined with very low traffic can cause delays @@ -42,7 +42,7 @@ impl Default for Config { fn default() -> Self { Self { socket_workers: 1, - request_workers: 1, + swarm_workers: 1, log_level: LogLevel::Error, worker_channel_size: 0, request_channel_recv_timeout_ms: 100, diff --git a/aquatic_udp/src/lib.rs b/aquatic_udp/src/lib.rs index aa66e11..815d69a 100644 --- a/aquatic_udp/src/lib.rs +++ b/aquatic_udp/src/lib.rs @@ -17,7 +17,7 @@ use aquatic_common::privileges::PrivilegeDropper; use aquatic_common::PanicSentinelWatcher; use common::{ - ConnectedRequestSender, ConnectedResponseSender, RequestWorkerIndex, SocketWorkerIndex, State, + ConnectedRequestSender, ConnectedResponseSender, SocketWorkerIndex, State, SwarmWorkerIndex, }; use config::Config; use workers::socket::validator::ConnectionValidator; @@ -28,7 +28,7 @@ pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION"); pub fn run(config: Config) -> ::anyhow::Result<()> { let mut signals = Signals::new([SIGUSR1, SIGTERM])?; - let state = State::new(config.request_workers); + let state = State::new(config.swarm_workers); let connection_validator = ConnectionValidator::new(&config)?; let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel(); let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); @@ -41,7 +41,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let mut response_senders = Vec::new(); let mut response_receivers = BTreeMap::new(); - for i in 0..config.request_workers { + for i in 0..config.swarm_workers { let (request_sender, request_receiver) = if config.worker_channel_size == 0 { unbounded() } else { @@ -63,7 +63,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { response_receivers.insert(i, response_receiver); } - for i in 0..config.request_workers { + for i in 0..config.swarm_workers { let sentinel = sentinel.clone(); let config = config.clone(); let state = state.clone(); @@ -71,26 +71,26 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let response_sender = ConnectedResponseSender::new(response_senders.clone()); Builder::new() - .name(format!("request-{:02}", i + 1)) + .name(format!("swarm-{:02}", i + 1)) .spawn(move || { #[cfg(feature = "cpu-pinning")] pin_current_if_configured_to( &config.cpu_pinning, config.socket_workers, - config.request_workers, - WorkerIndex::RequestWorker(i), + config.swarm_workers, + WorkerIndex::SwarmWorker(i), ); - workers::request::run_request_worker( + workers::swarm::run_swarm_worker( sentinel, config, state, request_receiver, response_sender, - RequestWorkerIndex(i), + SwarmWorkerIndex(i), ) }) - .with_context(|| "spawn request worker")?; + .with_context(|| "spawn swarm worker")?; } for i in 0..config.socket_workers { @@ -110,7 +110,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { pin_current_if_configured_to( &config.cpu_pinning, config.socket_workers, - config.request_workers, + config.swarm_workers, WorkerIndex::SocketWorker(i), ); @@ -140,7 +140,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { pin_current_if_configured_to( &config.cpu_pinning, config.socket_workers, - config.request_workers, + config.swarm_workers, WorkerIndex::Util, ); @@ -153,7 +153,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { pin_current_if_configured_to( &config.cpu_pinning, config.socket_workers, - config.request_workers, + config.swarm_workers, WorkerIndex::Util, ); diff --git a/aquatic_udp/src/workers/mod.rs b/aquatic_udp/src/workers/mod.rs index 668fd84..5446a1f 100644 --- a/aquatic_udp/src/workers/mod.rs +++ b/aquatic_udp/src/workers/mod.rs @@ -1,3 +1,3 @@ -pub mod request; pub mod socket; pub mod statistics; +pub mod swarm; diff --git a/aquatic_udp/src/workers/socket/requests.rs b/aquatic_udp/src/workers/socket/requests.rs index 779f080..eb995bd 100644 --- a/aquatic_udp/src/workers/socket/requests.rs +++ b/aquatic_udp/src/workers/socket/requests.rs @@ -127,8 +127,7 @@ fn handle_request( .load() .allows(access_list_mode, &request.info_hash.0) { - let worker_index = - RequestWorkerIndex::from_info_hash(config, request.info_hash); + let worker_index = SwarmWorkerIndex::from_info_hash(config, request.info_hash); request_sender.try_send_to( worker_index, @@ -153,9 +152,9 @@ fn handle_request( pending_scrape_valid_until, ); - for (request_worker_index, request) in split_requests { + for (swarm_worker_index, request) in split_requests { request_sender.try_send_to( - request_worker_index, + swarm_worker_index, ConnectedRequest::Scrape(request), src, ); diff --git a/aquatic_udp/src/workers/socket/storage.rs b/aquatic_udp/src/workers/socket/storage.rs index 4717daf..ee5ec54 100644 --- a/aquatic_udp/src/workers/socket/storage.rs +++ b/aquatic_udp/src/workers/socket/storage.rs @@ -27,9 +27,9 @@ impl PendingScrapeResponseSlab { config: &Config, request: ScrapeRequest, valid_until: ValidUntil, - ) -> impl IntoIterator { - let capacity = config.request_workers.min(request.info_hashes.len()); - let mut split_requests: HashMap = + ) -> impl IntoIterator { + let capacity = config.swarm_workers.min(request.info_hashes.len()); + let mut split_requests: HashMap = HashMap::with_capacity(capacity); if request.info_hashes.is_empty() { @@ -45,7 +45,7 @@ impl PendingScrapeResponseSlab { for (i, info_hash) in request.info_hashes.into_iter().enumerate() { let split_request = split_requests - .entry(RequestWorkerIndex::from_info_hash(&config, info_hash)) + .entry(SwarmWorkerIndex::from_info_hash(&config, info_hash)) .or_insert_with(|| PendingScrapeRequest { slab_key, info_hashes: BTreeMap::new(), @@ -128,15 +128,15 @@ mod tests { #[quickcheck] fn test_pending_scrape_response_slab( request_data: Vec<(i32, i64, u8)>, - request_workers: u8, + swarm_workers: u8, ) -> TestResult { - if request_workers == 0 { + if swarm_workers == 0 { return TestResult::discard(); } let mut config = Config::default(); - config.request_workers = request_workers as usize; + config.swarm_workers = swarm_workers as usize; let valid_until = ValidUntil::new(1); @@ -175,7 +175,7 @@ mod tests { all_split_requests.push( split_requests .into_iter() - .collect::>(), + .collect::>(), ); } @@ -185,7 +185,7 @@ mod tests { for split_requests in all_split_requests { for (worker_index, split_request) in split_requests { - assert!(worker_index.0 < request_workers as usize); + assert!(worker_index.0 < swarm_workers as usize); let torrent_stats = split_request .info_hashes diff --git a/aquatic_udp/src/workers/request/mod.rs b/aquatic_udp/src/workers/swarm/mod.rs similarity index 98% rename from aquatic_udp/src/workers/request/mod.rs rename to aquatic_udp/src/workers/swarm/mod.rs index 1b8335a..b2cc7ed 100644 --- a/aquatic_udp/src/workers/request/mod.rs +++ b/aquatic_udp/src/workers/swarm/mod.rs @@ -17,13 +17,13 @@ use crate::config::Config; use storage::{Peer, TorrentMap, TorrentMaps}; -pub fn run_request_worker( +pub fn run_swarm_worker( _sentinel: PanicSentinel, config: Config, state: State, request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>, response_sender: ConnectedResponseSender, - worker_index: RequestWorkerIndex, + worker_index: SwarmWorkerIndex, ) { let mut torrents = TorrentMaps::default(); let mut rng = SmallRng::from_entropy(); diff --git a/aquatic_udp/src/workers/request/storage.rs b/aquatic_udp/src/workers/swarm/storage.rs similarity index 100% rename from aquatic_udp/src/workers/request/storage.rs rename to aquatic_udp/src/workers/swarm/storage.rs diff --git a/aquatic_udp_bench/src/main.rs b/aquatic_udp_bench/src/main.rs index 89ca6da..3a7c256 100644 --- a/aquatic_udp_bench/src/main.rs +++ b/aquatic_udp_bench/src/main.rs @@ -8,7 +8,7 @@ //! ``` use aquatic_common::PanicSentinelWatcher; -use aquatic_udp::workers::request::run_request_worker; +use aquatic_udp::workers::swarm::run_swarm_worker; use crossbeam_channel::unbounded; use num_format::{Locale, ToFormattedString}; use rand::{rngs::SmallRng, thread_rng, Rng, SeedableRng}; @@ -53,16 +53,16 @@ pub fn run(bench_config: BenchConfig) -> ::anyhow::Result<()> { { let config = aquatic_config.clone(); - let state = State::new(config.request_workers); + let state = State::new(config.swarm_workers); ::std::thread::spawn(move || { - run_request_worker( + run_swarm_worker( sentinel, config, state, request_receiver, response_sender, - RequestWorkerIndex(0), + SwarmWorkerIndex(0), ) }); } diff --git a/aquatic_ws/src/lib.rs b/aquatic_ws/src/lib.rs index b6ffc22..e08b5c5 100644 --- a/aquatic_ws/src/lib.rs +++ b/aquatic_ws/src/lib.rs @@ -93,7 +93,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { &config.cpu_pinning, config.socket_workers, config.request_workers, - WorkerIndex::RequestWorker(i), + WorkerIndex::SwarmWorker(i), )?; let builder = LocalExecutorBuilder::new(placement).name("request"); From c89406179bd48dceb96fe9fad1a20ad0dc3b7640 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 4 Jul 2022 11:52:51 +0200 Subject: [PATCH 2/7] http, http_private: rename request workers to swarm workers --- aquatic_http/src/config.rs | 10 +++++----- aquatic_http/src/lib.rs | 12 ++++++------ aquatic_http/src/workers/mod.rs | 2 +- aquatic_http/src/workers/socket.rs | 6 +++--- aquatic_http/src/workers/{request.rs => swarm.rs} | 6 +++--- aquatic_http_private/src/common.rs | 2 +- aquatic_http_private/src/config.rs | 10 +++++----- aquatic_http_private/src/lib.rs | 8 +++----- aquatic_http_private/src/workers/mod.rs | 2 +- aquatic_http_private/src/workers/socket/routes.rs | 4 ++-- .../src/workers/{request => swarm}/common.rs | 0 .../src/workers/{request => swarm}/mod.rs | 2 +- 12 files changed, 31 insertions(+), 33 deletions(-) rename aquatic_http/src/workers/{request.rs => swarm.rs} (98%) rename aquatic_http_private/src/workers/{request => swarm}/common.rs (100%) rename aquatic_http_private/src/workers/{request => swarm}/mod.rs (99%) diff --git a/aquatic_http/src/config.rs b/aquatic_http/src/config.rs index d3cceaa..033ab35 100644 --- a/aquatic_http/src/config.rs +++ b/aquatic_http/src/config.rs @@ -14,12 +14,12 @@ use aquatic_common::cli::LogLevel; #[serde(default)] pub struct Config { /// Socket workers receive requests from the socket, parse them and send - /// them on to the request workers. They then receive responses from the - /// request workers, encode them and send them back over the socket. + /// them on to the swarm workers. They then receive responses from the + /// swarm workers, encode them and send them back over the socket. pub socket_workers: usize, - /// Request workers receive a number of requests from socket workers, + /// Swarm workers receive a number of requests from socket workers, /// generate responses and send them back to the socket workers. - pub request_workers: usize, + pub swarm_workers: usize, pub log_level: LogLevel, pub network: NetworkConfig, pub protocol: ProtocolConfig, @@ -33,7 +33,7 @@ impl Default for Config { fn default() -> Self { Self { socket_workers: 1, - request_workers: 1, + swarm_workers: 1, log_level: LogLevel::default(), network: NetworkConfig::default(), protocol: ProtocolConfig::default(), diff --git a/aquatic_http/src/lib.rs b/aquatic_http/src/lib.rs index 1faba55..328ada9 100644 --- a/aquatic_http/src/lib.rs +++ b/aquatic_http/src/lib.rs @@ -34,7 +34,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { update_access_list(&config.access_list, &state.access_list)?; - let num_peers = config.socket_workers + config.request_workers; + let num_peers = config.socket_workers + config.swarm_workers; let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); @@ -59,7 +59,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let placement = get_worker_placement( &config.cpu_pinning, config.socket_workers, - config.request_workers, + config.swarm_workers, WorkerIndex::SocketWorker(i), )?; let builder = LocalExecutorBuilder::new(placement).name("socket"); @@ -81,7 +81,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { executors.push(executor); } - for i in 0..(config.request_workers) { + for i in 0..(config.swarm_workers) { let sentinel = sentinel.clone(); let config = config.clone(); let state = state.clone(); @@ -90,14 +90,14 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let placement = get_worker_placement( &config.cpu_pinning, config.socket_workers, - config.request_workers, + config.swarm_workers, WorkerIndex::SwarmWorker(i), )?; let builder = LocalExecutorBuilder::new(placement).name("request"); let executor = builder .spawn(move || async move { - workers::request::run_request_worker(sentinel, config, state, request_mesh_builder) + workers::swarm::run_swarm_worker(sentinel, config, state, request_mesh_builder) .await }) .map_err(|err| anyhow::anyhow!("Spawning executor failed: {:#}", err))?; @@ -109,7 +109,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { set_affinity_for_util_worker( &config.cpu_pinning, config.socket_workers, - config.request_workers, + config.swarm_workers, )?; } diff --git a/aquatic_http/src/workers/mod.rs b/aquatic_http/src/workers/mod.rs index 63fc0ec..28ef095 100644 --- a/aquatic_http/src/workers/mod.rs +++ b/aquatic_http/src/workers/mod.rs @@ -1,2 +1,2 @@ -pub mod request; pub mod socket; +pub mod swarm; diff --git a/aquatic_http/src/workers/socket.rs b/aquatic_http/src/workers/socket.rs index 37e0b16..f47bb33 100644 --- a/aquatic_http/src/workers/socket.rs +++ b/aquatic_http/src/workers/socket.rs @@ -264,10 +264,10 @@ impl Connection { /// Take a request and: /// - Update connection ValidUntil /// - Return error response if request is not allowed - /// - If it is an announce request, send it to request workers an await a + /// - If it is an announce request, send it to swarm workers an await a /// response /// - If it is a scrape requests, split it up, pass on the parts to - /// relevant request workers and await a response + /// relevant swarm workers and await a response async fn handle_request(&mut self, request: Request) -> anyhow::Result { if let Ok(mut slab) = self.connection_slab.try_borrow_mut() { if let Some(reference) = slab.get_mut(self.connection_id.0) { @@ -448,7 +448,7 @@ impl Connection { } fn calculate_request_consumer_index(config: &Config, info_hash: InfoHash) -> usize { - (info_hash.0[0] as usize) % config.request_workers + (info_hash.0[0] as usize) % config.swarm_workers } fn create_tcp_listener( diff --git a/aquatic_http/src/workers/request.rs b/aquatic_http/src/workers/swarm.rs similarity index 98% rename from aquatic_http/src/workers/request.rs rename to aquatic_http/src/workers/swarm.rs index 099298a..c952938 100644 --- a/aquatic_http/src/workers/request.rs +++ b/aquatic_http/src/workers/swarm.rs @@ -157,7 +157,7 @@ impl TorrentMaps { } } -pub async fn run_request_worker( +pub async fn run_swarm_worker( _sentinel: PanicSentinel, config: Config, state: State, @@ -235,7 +235,7 @@ async fn handle_request_stream( ); if let Err(err) = response_sender.connect().await.send(response).await { - ::log::error!("request worker could not send announce response: {:#}", err); + ::log::error!("swarm worker could not send announce response: {:#}", err); } } ChannelRequest::Scrape { @@ -247,7 +247,7 @@ async fn handle_request_stream( handle_scrape_request(&config, &mut torrents.borrow_mut(), peer_addr, request); if let Err(err) = response_sender.connect().await.send(response).await { - ::log::error!("request worker could not send scrape response: {:#}", err); + ::log::error!("swarm worker could not send scrape response: {:#}", err); } } }; diff --git a/aquatic_http_private/src/common.rs b/aquatic_http_private/src/common.rs index 092d09a..b1b247b 100644 --- a/aquatic_http_private/src/common.rs +++ b/aquatic_http_private/src/common.rs @@ -17,7 +17,7 @@ pub struct RequestWorkerIndex(pub usize); impl RequestWorkerIndex { pub fn from_info_hash(config: &Config, info_hash: InfoHash) -> Self { - Self(info_hash.0[0] as usize % config.request_workers) + Self(info_hash.0[0] as usize % config.swarm_workers) } } diff --git a/aquatic_http_private/src/config.rs b/aquatic_http_private/src/config.rs index b800cc8..43e8e2c 100644 --- a/aquatic_http_private/src/config.rs +++ b/aquatic_http_private/src/config.rs @@ -11,12 +11,12 @@ use aquatic_common::cli::LogLevel; #[serde(default)] pub struct Config { /// Socket workers receive requests from the socket, parse them and send - /// them on to the request workers. They then receive responses from the - /// request workers, encode them and send them back over the socket. + /// them on to the swarm workers. They then receive responses from the + /// swarm workers, encode them and send them back over the socket. pub socket_workers: usize, - /// Request workers receive a number of requests from socket workers, + /// Swarm workers receive a number of requests from socket workers, /// generate responses and send them back to the socket workers. - pub request_workers: usize, + pub swarm_workers: usize, pub worker_channel_size: usize, /// Number of database connections to establish in each socket worker pub db_connections_per_worker: u32, @@ -31,7 +31,7 @@ impl Default for Config { fn default() -> Self { Self { socket_workers: 1, - request_workers: 1, + swarm_workers: 1, worker_channel_size: 128, db_connections_per_worker: 4, log_level: LogLevel::default(), diff --git a/aquatic_http_private/src/lib.rs b/aquatic_http_private/src/lib.rs index 88609e1..e10ee83 100644 --- a/aquatic_http_private/src/lib.rs +++ b/aquatic_http_private/src/lib.rs @@ -30,7 +30,7 @@ pub fn run(config: Config) -> anyhow::Result<()> { let mut request_senders = Vec::new(); let mut request_receivers = VecDeque::new(); - for _ in 0..config.request_workers { + for _ in 0..config.swarm_workers { let (request_sender, request_receiver) = channel(config.worker_channel_size); request_senders.push(request_sender); @@ -64,16 +64,14 @@ pub fn run(config: Config) -> anyhow::Result<()> { handles.push(handle); } - for _ in 0..config.request_workers { + for _ in 0..config.swarm_workers { let sentinel = sentinel.clone(); let config = config.clone(); let request_receiver = request_receivers.pop_front().unwrap(); let handle = ::std::thread::Builder::new() .name("request".into()) - .spawn(move || { - workers::request::run_request_worker(sentinel, config, request_receiver) - })?; + .spawn(move || workers::swarm::run_swarm_worker(sentinel, config, request_receiver))?; handles.push(handle); } diff --git a/aquatic_http_private/src/workers/mod.rs b/aquatic_http_private/src/workers/mod.rs index 63fc0ec..28ef095 100644 --- a/aquatic_http_private/src/workers/mod.rs +++ b/aquatic_http_private/src/workers/mod.rs @@ -1,2 +1,2 @@ -pub mod request; pub mod socket; +pub mod swarm; diff --git a/aquatic_http_private/src/workers/socket/routes.rs b/aquatic_http_private/src/workers/socket/routes.rs index 8fd139b..1d83f29 100644 --- a/aquatic_http_private/src/workers/socket/routes.rs +++ b/aquatic_http_private/src/workers/socket/routes.rs @@ -33,7 +33,7 @@ pub async fn announce( let request = AnnounceRequest::from_query_string(&query) .map_err(|_| FailureResponse::new("Malformed request"))?; - let request_worker_index = RequestWorkerIndex::from_info_hash(&config, request.info_hash); + let swarm_worker_index = RequestWorkerIndex::from_info_hash(&config, request.info_hash); let opt_user_agent = opt_user_agent.map(|header| header.as_str().to_owned()); let source_addr = CanonicalSocketAddr::new(source_addr); @@ -43,7 +43,7 @@ pub async fn announce( .await?; let response_receiver = request_sender - .send_to(request_worker_index, validated_request, source_addr) + .send_to(swarm_worker_index, validated_request, source_addr) .await .map_err(|err| internal_error(format!("Sending request over channel failed: {:#}", err)))?; diff --git a/aquatic_http_private/src/workers/request/common.rs b/aquatic_http_private/src/workers/swarm/common.rs similarity index 100% rename from aquatic_http_private/src/workers/request/common.rs rename to aquatic_http_private/src/workers/swarm/common.rs diff --git a/aquatic_http_private/src/workers/request/mod.rs b/aquatic_http_private/src/workers/swarm/mod.rs similarity index 99% rename from aquatic_http_private/src/workers/request/mod.rs rename to aquatic_http_private/src/workers/swarm/mod.rs index c684256..121c34d 100644 --- a/aquatic_http_private/src/workers/request/mod.rs +++ b/aquatic_http_private/src/workers/swarm/mod.rs @@ -21,7 +21,7 @@ use crate::config::Config; use common::*; -pub fn run_request_worker( +pub fn run_swarm_worker( _sentinel: PanicSentinel, config: Config, request_receiver: Receiver, From 224d50e98b495944baabf653c9d6d806ee52194d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 4 Jul 2022 11:53:42 +0200 Subject: [PATCH 3/7] common: do more request to swarm worker renames --- aquatic_common/src/cpu_pinning.rs | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/aquatic_common/src/cpu_pinning.rs b/aquatic_common/src/cpu_pinning.rs index 1358a1c..cd0d219 100644 --- a/aquatic_common/src/cpu_pinning.rs +++ b/aquatic_common/src/cpu_pinning.rs @@ -99,13 +99,13 @@ impl WorkerIndex { &self, config: &C, socket_workers: usize, - request_workers: usize, + swarm_workers: usize, num_cores: usize, ) -> usize { let ascending_index = match self { Self::SocketWorker(index) => config.core_offset() + index, Self::SwarmWorker(index) => config.core_offset() + socket_workers + index, - Self::Util => config.core_offset() + socket_workers + request_workers, + Self::Util => config.core_offset() + socket_workers + swarm_workers, }; let max_core_index = num_cores - 1; @@ -153,13 +153,13 @@ pub mod glommio { fn get_worker_cpu_set( config: &C, socket_workers: usize, - request_workers: usize, + swarm_workers: usize, worker_index: WorkerIndex, ) -> anyhow::Result { let num_cpu_cores = get_num_cpu_cores()?; let core_index = - worker_index.get_core_index(config, socket_workers, request_workers, num_cpu_cores); + worker_index.get_core_index(config, socket_workers, swarm_workers, num_cpu_cores); let too_many_workers = match (&config.hyperthread(), &config.direction()) { ( @@ -223,12 +223,11 @@ pub mod glommio { pub fn get_worker_placement( config: &C, socket_workers: usize, - request_workers: usize, + swarm_workers: usize, worker_index: WorkerIndex, ) -> anyhow::Result { if config.active() { - let cpu_set = - get_worker_cpu_set(config, socket_workers, request_workers, worker_index)?; + let cpu_set = get_worker_cpu_set(config, socket_workers, swarm_workers, worker_index)?; Ok(Placement::Fenced(cpu_set)) } else { @@ -239,10 +238,10 @@ pub mod glommio { pub fn set_affinity_for_util_worker( config: &C, socket_workers: usize, - request_workers: usize, + swarm_workers: usize, ) -> anyhow::Result<()> { let worker_cpu_set = - get_worker_cpu_set(config, socket_workers, request_workers, WorkerIndex::Util)?; + get_worker_cpu_set(config, socket_workers, swarm_workers, WorkerIndex::Util)?; unsafe { let mut set: libc::cpu_set_t = ::std::mem::zeroed(); @@ -275,7 +274,7 @@ pub mod glommio { pub fn pin_current_if_configured_to( config: &C, socket_workers: usize, - request_workers: usize, + swarm_workers: usize, worker_index: WorkerIndex, ) { use hwloc::{CpuSet, ObjectType, Topology, CPUBIND_THREAD}; @@ -293,7 +292,7 @@ pub fn pin_current_if_configured_to( let num_cores = core_cpu_sets.len(); let core_index = - worker_index.get_core_index(config, socket_workers, request_workers, num_cores); + worker_index.get_core_index(config, socket_workers, swarm_workers, num_cores); let cpu_set = core_cpu_sets .get(core_index) From 12fc8bcf1ecab087a39ec6fab3c0b1a06e160960 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 4 Jul 2022 11:55:53 +0200 Subject: [PATCH 4/7] ws: rename request workers to swarm workers --- aquatic_ws/src/config.rs | 10 +++++----- aquatic_ws/src/lib.rs | 12 ++++++------ aquatic_ws/src/workers/mod.rs | 2 +- aquatic_ws/src/workers/socket.rs | 6 +++--- aquatic_ws/src/workers/{request.rs => swarm.rs} | 6 +++--- 5 files changed, 18 insertions(+), 18 deletions(-) rename aquatic_ws/src/workers/{request.rs => swarm.rs} (98%) diff --git a/aquatic_ws/src/config.rs b/aquatic_ws/src/config.rs index fbfe8bf..136ab9f 100644 --- a/aquatic_ws/src/config.rs +++ b/aquatic_ws/src/config.rs @@ -13,12 +13,12 @@ use aquatic_toml_config::TomlConfig; #[serde(default)] pub struct Config { /// Socket workers receive requests from the socket, parse them and send - /// them on to the request workers. They then receive responses from the - /// request workers, encode them and send them back over the socket. + /// them on to the swarm workers. They then receive responses from the + /// swarm workers, encode them and send them back over the socket. pub socket_workers: usize, - /// Request workers receive a number of requests from socket workers, + /// Swarm workers receive a number of requests from socket workers, /// generate responses and send them back to the socket workers. - pub request_workers: usize, + pub swarm_workers: usize, pub log_level: LogLevel, pub network: NetworkConfig, pub protocol: ProtocolConfig, @@ -32,7 +32,7 @@ impl Default for Config { fn default() -> Self { Self { socket_workers: 1, - request_workers: 1, + swarm_workers: 1, log_level: LogLevel::default(), network: NetworkConfig::default(), protocol: ProtocolConfig::default(), diff --git a/aquatic_ws/src/lib.rs b/aquatic_ws/src/lib.rs index e08b5c5..daa77ed 100644 --- a/aquatic_ws/src/lib.rs +++ b/aquatic_ws/src/lib.rs @@ -32,7 +32,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { update_access_list(&config.access_list, &state.access_list)?; - let num_peers = config.socket_workers + config.request_workers; + let num_peers = config.socket_workers + config.swarm_workers; let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE); let response_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE * 16); @@ -59,7 +59,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let placement = get_worker_placement( &config.cpu_pinning, config.socket_workers, - config.request_workers, + config.swarm_workers, WorkerIndex::SocketWorker(i), )?; let builder = LocalExecutorBuilder::new(placement).name("socket"); @@ -82,7 +82,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { executors.push(executor); } - for i in 0..(config.request_workers) { + for i in 0..(config.swarm_workers) { let sentinel = sentinel.clone(); let config = config.clone(); let state = state.clone(); @@ -92,14 +92,14 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let placement = get_worker_placement( &config.cpu_pinning, config.socket_workers, - config.request_workers, + config.swarm_workers, WorkerIndex::SwarmWorker(i), )?; let builder = LocalExecutorBuilder::new(placement).name("request"); let executor = builder .spawn(move || async move { - workers::request::run_request_worker( + workers::swarm::run_swarm_worker( sentinel, config, state, @@ -117,7 +117,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { set_affinity_for_util_worker( &config.cpu_pinning, config.socket_workers, - config.request_workers, + config.swarm_workers, )?; } diff --git a/aquatic_ws/src/workers/mod.rs b/aquatic_ws/src/workers/mod.rs index 63fc0ec..28ef095 100644 --- a/aquatic_ws/src/workers/mod.rs +++ b/aquatic_ws/src/workers/mod.rs @@ -1,2 +1,2 @@ -pub mod request; pub mod socket; +pub mod swarm; diff --git a/aquatic_ws/src/workers/socket.rs b/aquatic_ws/src/workers/socket.rs index cfda49a..42dfb5f 100644 --- a/aquatic_ws/src/workers/socket.rs +++ b/aquatic_ws/src/workers/socket.rs @@ -352,7 +352,7 @@ impl ConnectionReader { ) .await .unwrap(); - ::log::info!("sent message to request worker"); + ::log::info!("sent message to swarm worker"); } else { self.send_error_response("Info hash not allowed".into(), Some(info_hash)) .await?; @@ -404,7 +404,7 @@ impl ConnectionReader { .send_to(consumer_index, (meta, in_message)) .await .unwrap(); - ::log::info!("sent message to request worker"); + ::log::info!("sent message to swarm worker"); } } } @@ -541,7 +541,7 @@ impl ConnectionWriter { } fn calculate_in_message_consumer_index(config: &Config, info_hash: InfoHash) -> usize { - (info_hash.0[0] as usize) % config.request_workers + (info_hash.0[0] as usize) % config.swarm_workers } fn create_tcp_listener( diff --git a/aquatic_ws/src/workers/request.rs b/aquatic_ws/src/workers/swarm.rs similarity index 98% rename from aquatic_ws/src/workers/request.rs rename to aquatic_ws/src/workers/swarm.rs index 92f07ad..87cdd48 100644 --- a/aquatic_ws/src/workers/request.rs +++ b/aquatic_ws/src/workers/swarm.rs @@ -127,7 +127,7 @@ impl TorrentMaps { } } -pub async fn run_request_worker( +pub async fn run_swarm_worker( _sentinel: PanicSentinel, config: Config, state: State, @@ -223,14 +223,14 @@ async fn handle_request_stream( }; for (meta, out_message) in out_messages.drain(..) { - ::log::info!("request worker trying to send OutMessage to socket worker"); + ::log::info!("swarm worker trying to send OutMessage to socket worker"); out_message_senders .send_to(meta.out_message_consumer_id.0, (meta, out_message)) .await .expect("failed sending out_message to socket worker"); - ::log::info!("request worker sent OutMessage to socket worker"); + ::log::info!("swarm worker sent OutMessage to socket worker"); } }, ) From 7fb68ef52d003bd4561932e5c9ff76878ee8b8d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 4 Jul 2022 11:59:21 +0200 Subject: [PATCH 5/7] README and architecture svg: rename request worker to swarm worker --- README.md | 10 +++++----- documents/aquatic-architecture-2022-02-02.svg | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 000e02b..dd277a1 100644 --- a/README.md +++ b/README.md @@ -72,7 +72,7 @@ respective configuration files. #### Workers To increase performance, number of worker threads can be increased. The sum of -`socket_workers` and `request_workers` should equal the total number of CPU cores +`socket_workers` and `swarm_workers` should equal the total number of CPU cores that you want to use. Recommended proportions: @@ -92,7 +92,7 @@ that you want to use. Recommended proportions: - + @@ -180,7 +180,7 @@ This is the most mature of the implementations. I consider it ready for producti ![UDP BitTorrent tracker throughput comparison](./documents/aquatic-udp-load-test-illustration-2021-11-28.png) -More details are available [here](./documents/aquatic-udp-load-test-2021-11-28.pdf). +More details are available [here](./documents/aquatic-udp-load-test-2021-11-28.pdf). Please note that request workers have been renamed to swarm workers. #### Optimisation attempts that didn't work out @@ -213,7 +213,7 @@ fine. ![HTTP BitTorrent tracker throughput comparison](./documents/aquatic-http-load-test-illustration-2022-04-11.png) -More details are available [here](./documents/aquatic-http-load-test-2022-04-11.pdf). +More details are available [here](./documents/aquatic-http-load-test-2022-04-11.pdf). Please note that request workers have been renamed to swarm workers. ### aquatic_ws: WebTorrent tracker @@ -231,7 +231,7 @@ fine. ![WebTorrent tracker throughput comparison](./documents/aquatic-ws-load-test-illustration-2022-03-29.png) -More details are available [here](./documents/aquatic-ws-load-test-2022-03-29.pdf). +More details are available [here](./documents/aquatic-ws-load-test-2022-03-29.pdf). Please note that request workers have been renamed to swarm workers. ## Load testing diff --git a/documents/aquatic-architecture-2022-02-02.svg b/documents/aquatic-architecture-2022-02-02.svg index 58fc08c..4d00454 100644 --- a/documents/aquatic-architecture-2022-02-02.svg +++ b/documents/aquatic-architecture-2022-02-02.svg @@ -1,3 +1,3 @@ -
Socket worker
Socket worker
Socket worker
Socket worker
Responses
Responses
Socket worker
Socket worker
Requests
Requests
Socket
Socket
Request worker
Request worker
Request worker
Request worker
Socket
Socket
Socket
Socket
Responses
Responses
Sharded torrent state
Sharded to...
Sharded torrent state
Sharded to...
  • Establish connections, or in the case of aquatic_udp, validate source IPs
  • Receive and parse requests from peers
  • Run access list checks
  • Send on announce requests to the responsible request worker
  • Split scrape requests to match state shards and send them to the request workers
  • Receive responses from the request workers, serialize them and send them to peers

Establish connections, or in the case of aqua...
  • Receive announce and scrape requests from the socket workers
  • Update torrent state if appropriate, generate responses
  • Send responses back to the sending socket workers

Receive announce and scrape requests from the...
Connection state
Connection...
Connection state
Connection...
Connection state
Connection...
Requests
Requests
Text is not SVG - cannot display
\ No newline at end of file +
Socket worker
Socket worker
Socket worker
Socket worker
Responses
Responses
Socket worker
Socket worker
Requests
Requests
Socket
Socket
Swarm worker
Swarm worker
Swarm worker
Swarm worker
Socket
Socket
Socket
Socket
Responses
Responses
Sharded torrent state
Sharded to...
Sharded torrent state
Sharded to...
  • Establish connections, or in the case of aquatic_udp, validate source IPs
  • Receive and parse requests from peers
  • Run access list checks
  • Send on announce requests to the responsible swarm worker
  • Split scrape requests to match state shards and send them to the swarm workers
  • Receive responses from the swarm workers, serialize them and send them to peers

Establish connections, or in the case of aqua...
  • Receive announce and scrape requests from the socket workers
  • Update torrent state if appropriate, generate responses
  • Send responses back to the sending socket workers

Receive announce and scrape requests from the...
Connection state
Connection...
Connection state
Connection...
Connection state
Connection...
Requests
Requests
Text is not SVG - cannot display
\ No newline at end of file From a7837ebffbcbc522f092754e782ffca60fa63342 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 4 Jul 2022 12:01:53 +0200 Subject: [PATCH 6/7] Update TODO --- TODO.md | 1 - 1 file changed, 1 deletion(-) diff --git a/TODO.md b/TODO.md index c34f091..fa3d7e9 100644 --- a/TODO.md +++ b/TODO.md @@ -7,7 +7,6 @@ ## Medium priority -* rename request workers to swarm workers * quit whole program if any thread panics * But it would be nice not to panic in workers, but to return errors instead. Once JoinHandle::is_finished is available in stable Rust (#90470), an From 4ef51937a8b7882aa6f17177d2f1a09025d0e31c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 4 Jul 2022 12:22:00 +0200 Subject: [PATCH 7/7] udp: fix rename (request worker to swarm worker) --- aquatic_udp/src/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aquatic_udp/src/config.rs b/aquatic_udp/src/config.rs index eadcf5d..10cddd1 100644 --- a/aquatic_udp/src/config.rs +++ b/aquatic_udp/src/config.rs @@ -182,7 +182,7 @@ pub struct CleaningConfig { pub max_connection_age: u32, /// Remove peers who have not announced for this long (seconds) pub max_peer_age: u64, - /// Remove pending scrape responses that have not been returned from request + /// Remove pending scrape responses that have not been returned from swarm /// workers for this long (seconds) pub max_pending_scrape_age: u64, }
>=20
Request workersSwarm workers 1 2 3