From c89406179bd48dceb96fe9fad1a20ad0dc3b7640 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 4 Jul 2022 11:52:51 +0200 Subject: [PATCH] http, http_private: rename request workers to swarm workers --- aquatic_http/src/config.rs | 10 +++++----- aquatic_http/src/lib.rs | 12 ++++++------ aquatic_http/src/workers/mod.rs | 2 +- aquatic_http/src/workers/socket.rs | 6 +++--- aquatic_http/src/workers/{request.rs => swarm.rs} | 6 +++--- aquatic_http_private/src/common.rs | 2 +- aquatic_http_private/src/config.rs | 10 +++++----- aquatic_http_private/src/lib.rs | 8 +++----- aquatic_http_private/src/workers/mod.rs | 2 +- aquatic_http_private/src/workers/socket/routes.rs | 4 ++-- .../src/workers/{request => swarm}/common.rs | 0 .../src/workers/{request => swarm}/mod.rs | 2 +- 12 files changed, 31 insertions(+), 33 deletions(-) rename aquatic_http/src/workers/{request.rs => swarm.rs} (98%) rename aquatic_http_private/src/workers/{request => swarm}/common.rs (100%) rename aquatic_http_private/src/workers/{request => swarm}/mod.rs (99%) diff --git a/aquatic_http/src/config.rs b/aquatic_http/src/config.rs index d3cceaa..033ab35 100644 --- a/aquatic_http/src/config.rs +++ b/aquatic_http/src/config.rs @@ -14,12 +14,12 @@ use aquatic_common::cli::LogLevel; #[serde(default)] pub struct Config { /// Socket workers receive requests from the socket, parse them and send - /// them on to the request workers. They then receive responses from the - /// request workers, encode them and send them back over the socket. + /// them on to the swarm workers. They then receive responses from the + /// swarm workers, encode them and send them back over the socket. pub socket_workers: usize, - /// Request workers receive a number of requests from socket workers, + /// Swarm workers receive a number of requests from socket workers, /// generate responses and send them back to the socket workers. - pub request_workers: usize, + pub swarm_workers: usize, pub log_level: LogLevel, pub network: NetworkConfig, pub protocol: ProtocolConfig, @@ -33,7 +33,7 @@ impl Default for Config { fn default() -> Self { Self { socket_workers: 1, - request_workers: 1, + swarm_workers: 1, log_level: LogLevel::default(), network: NetworkConfig::default(), protocol: ProtocolConfig::default(), diff --git a/aquatic_http/src/lib.rs b/aquatic_http/src/lib.rs index 1faba55..328ada9 100644 --- a/aquatic_http/src/lib.rs +++ b/aquatic_http/src/lib.rs @@ -34,7 +34,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { update_access_list(&config.access_list, &state.access_list)?; - let num_peers = config.socket_workers + config.request_workers; + let num_peers = config.socket_workers + config.swarm_workers; let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); @@ -59,7 +59,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let placement = get_worker_placement( &config.cpu_pinning, config.socket_workers, - config.request_workers, + config.swarm_workers, WorkerIndex::SocketWorker(i), )?; let builder = LocalExecutorBuilder::new(placement).name("socket"); @@ -81,7 +81,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { executors.push(executor); } - for i in 0..(config.request_workers) { + for i in 0..(config.swarm_workers) { let sentinel = sentinel.clone(); let config = config.clone(); let state = state.clone(); @@ -90,14 +90,14 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let placement = get_worker_placement( &config.cpu_pinning, config.socket_workers, - config.request_workers, + config.swarm_workers, WorkerIndex::SwarmWorker(i), )?; let builder = LocalExecutorBuilder::new(placement).name("request"); let executor = builder .spawn(move || async move { - workers::request::run_request_worker(sentinel, config, state, request_mesh_builder) + workers::swarm::run_swarm_worker(sentinel, config, state, request_mesh_builder) .await }) .map_err(|err| anyhow::anyhow!("Spawning executor failed: {:#}", err))?; @@ -109,7 +109,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { set_affinity_for_util_worker( &config.cpu_pinning, config.socket_workers, - config.request_workers, + config.swarm_workers, )?; } diff --git a/aquatic_http/src/workers/mod.rs b/aquatic_http/src/workers/mod.rs index 63fc0ec..28ef095 100644 --- a/aquatic_http/src/workers/mod.rs +++ b/aquatic_http/src/workers/mod.rs @@ -1,2 +1,2 @@ -pub mod request; pub mod socket; +pub mod swarm; diff --git a/aquatic_http/src/workers/socket.rs b/aquatic_http/src/workers/socket.rs index 37e0b16..f47bb33 100644 --- a/aquatic_http/src/workers/socket.rs +++ b/aquatic_http/src/workers/socket.rs @@ -264,10 +264,10 @@ impl Connection { /// Take a request and: /// - Update connection ValidUntil /// - Return error response if request is not allowed - /// - If it is an announce request, send it to request workers an await a + /// - If it is an announce request, send it to swarm workers an await a /// response /// - If it is a scrape requests, split it up, pass on the parts to - /// relevant request workers and await a response + /// relevant swarm workers and await a response async fn handle_request(&mut self, request: Request) -> anyhow::Result { if let Ok(mut slab) = self.connection_slab.try_borrow_mut() { if let Some(reference) = slab.get_mut(self.connection_id.0) { @@ -448,7 +448,7 @@ impl Connection { } fn calculate_request_consumer_index(config: &Config, info_hash: InfoHash) -> usize { - (info_hash.0[0] as usize) % config.request_workers + (info_hash.0[0] as usize) % config.swarm_workers } fn create_tcp_listener( diff --git a/aquatic_http/src/workers/request.rs b/aquatic_http/src/workers/swarm.rs similarity index 98% rename from aquatic_http/src/workers/request.rs rename to aquatic_http/src/workers/swarm.rs index 099298a..c952938 100644 --- a/aquatic_http/src/workers/request.rs +++ b/aquatic_http/src/workers/swarm.rs @@ -157,7 +157,7 @@ impl TorrentMaps { } } -pub async fn run_request_worker( +pub async fn run_swarm_worker( _sentinel: PanicSentinel, config: Config, state: State, @@ -235,7 +235,7 @@ async fn handle_request_stream( ); if let Err(err) = response_sender.connect().await.send(response).await { - ::log::error!("request worker could not send announce response: {:#}", err); + ::log::error!("swarm worker could not send announce response: {:#}", err); } } ChannelRequest::Scrape { @@ -247,7 +247,7 @@ async fn handle_request_stream( handle_scrape_request(&config, &mut torrents.borrow_mut(), peer_addr, request); if let Err(err) = response_sender.connect().await.send(response).await { - ::log::error!("request worker could not send scrape response: {:#}", err); + ::log::error!("swarm worker could not send scrape response: {:#}", err); } } }; diff --git a/aquatic_http_private/src/common.rs b/aquatic_http_private/src/common.rs index 092d09a..b1b247b 100644 --- a/aquatic_http_private/src/common.rs +++ b/aquatic_http_private/src/common.rs @@ -17,7 +17,7 @@ pub struct RequestWorkerIndex(pub usize); impl RequestWorkerIndex { pub fn from_info_hash(config: &Config, info_hash: InfoHash) -> Self { - Self(info_hash.0[0] as usize % config.request_workers) + Self(info_hash.0[0] as usize % config.swarm_workers) } } diff --git a/aquatic_http_private/src/config.rs b/aquatic_http_private/src/config.rs index b800cc8..43e8e2c 100644 --- a/aquatic_http_private/src/config.rs +++ b/aquatic_http_private/src/config.rs @@ -11,12 +11,12 @@ use aquatic_common::cli::LogLevel; #[serde(default)] pub struct Config { /// Socket workers receive requests from the socket, parse them and send - /// them on to the request workers. They then receive responses from the - /// request workers, encode them and send them back over the socket. + /// them on to the swarm workers. They then receive responses from the + /// swarm workers, encode them and send them back over the socket. pub socket_workers: usize, - /// Request workers receive a number of requests from socket workers, + /// Swarm workers receive a number of requests from socket workers, /// generate responses and send them back to the socket workers. - pub request_workers: usize, + pub swarm_workers: usize, pub worker_channel_size: usize, /// Number of database connections to establish in each socket worker pub db_connections_per_worker: u32, @@ -31,7 +31,7 @@ impl Default for Config { fn default() -> Self { Self { socket_workers: 1, - request_workers: 1, + swarm_workers: 1, worker_channel_size: 128, db_connections_per_worker: 4, log_level: LogLevel::default(), diff --git a/aquatic_http_private/src/lib.rs b/aquatic_http_private/src/lib.rs index 88609e1..e10ee83 100644 --- a/aquatic_http_private/src/lib.rs +++ b/aquatic_http_private/src/lib.rs @@ -30,7 +30,7 @@ pub fn run(config: Config) -> anyhow::Result<()> { let mut request_senders = Vec::new(); let mut request_receivers = VecDeque::new(); - for _ in 0..config.request_workers { + for _ in 0..config.swarm_workers { let (request_sender, request_receiver) = channel(config.worker_channel_size); request_senders.push(request_sender); @@ -64,16 +64,14 @@ pub fn run(config: Config) -> anyhow::Result<()> { handles.push(handle); } - for _ in 0..config.request_workers { + for _ in 0..config.swarm_workers { let sentinel = sentinel.clone(); let config = config.clone(); let request_receiver = request_receivers.pop_front().unwrap(); let handle = ::std::thread::Builder::new() .name("request".into()) - .spawn(move || { - workers::request::run_request_worker(sentinel, config, request_receiver) - })?; + .spawn(move || workers::swarm::run_swarm_worker(sentinel, config, request_receiver))?; handles.push(handle); } diff --git a/aquatic_http_private/src/workers/mod.rs b/aquatic_http_private/src/workers/mod.rs index 63fc0ec..28ef095 100644 --- a/aquatic_http_private/src/workers/mod.rs +++ b/aquatic_http_private/src/workers/mod.rs @@ -1,2 +1,2 @@ -pub mod request; pub mod socket; +pub mod swarm; diff --git a/aquatic_http_private/src/workers/socket/routes.rs b/aquatic_http_private/src/workers/socket/routes.rs index 8fd139b..1d83f29 100644 --- a/aquatic_http_private/src/workers/socket/routes.rs +++ b/aquatic_http_private/src/workers/socket/routes.rs @@ -33,7 +33,7 @@ pub async fn announce( let request = AnnounceRequest::from_query_string(&query) .map_err(|_| FailureResponse::new("Malformed request"))?; - let request_worker_index = RequestWorkerIndex::from_info_hash(&config, request.info_hash); + let swarm_worker_index = RequestWorkerIndex::from_info_hash(&config, request.info_hash); let opt_user_agent = opt_user_agent.map(|header| header.as_str().to_owned()); let source_addr = CanonicalSocketAddr::new(source_addr); @@ -43,7 +43,7 @@ pub async fn announce( .await?; let response_receiver = request_sender - .send_to(request_worker_index, validated_request, source_addr) + .send_to(swarm_worker_index, validated_request, source_addr) .await .map_err(|err| internal_error(format!("Sending request over channel failed: {:#}", err)))?; diff --git a/aquatic_http_private/src/workers/request/common.rs b/aquatic_http_private/src/workers/swarm/common.rs similarity index 100% rename from aquatic_http_private/src/workers/request/common.rs rename to aquatic_http_private/src/workers/swarm/common.rs diff --git a/aquatic_http_private/src/workers/request/mod.rs b/aquatic_http_private/src/workers/swarm/mod.rs similarity index 99% rename from aquatic_http_private/src/workers/request/mod.rs rename to aquatic_http_private/src/workers/swarm/mod.rs index c684256..121c34d 100644 --- a/aquatic_http_private/src/workers/request/mod.rs +++ b/aquatic_http_private/src/workers/swarm/mod.rs @@ -21,7 +21,7 @@ use crate::config::Config; use common::*; -pub fn run_request_worker( +pub fn run_swarm_worker( _sentinel: PanicSentinel, config: Config, request_receiver: Receiver,