diff --git a/aquatic_ws/src/config.rs b/aquatic_ws/src/config.rs index fbfe8bf..136ab9f 100644 --- a/aquatic_ws/src/config.rs +++ b/aquatic_ws/src/config.rs @@ -13,12 +13,12 @@ use aquatic_toml_config::TomlConfig; #[serde(default)] pub struct Config { /// Socket workers receive requests from the socket, parse them and send - /// them on to the request workers. They then receive responses from the - /// request workers, encode them and send them back over the socket. + /// them on to the swarm workers. They then receive responses from the + /// swarm workers, encode them and send them back over the socket. pub socket_workers: usize, - /// Request workers receive a number of requests from socket workers, + /// Swarm workers receive a number of requests from socket workers, /// generate responses and send them back to the socket workers. - pub request_workers: usize, + pub swarm_workers: usize, pub log_level: LogLevel, pub network: NetworkConfig, pub protocol: ProtocolConfig, @@ -32,7 +32,7 @@ impl Default for Config { fn default() -> Self { Self { socket_workers: 1, - request_workers: 1, + swarm_workers: 1, log_level: LogLevel::default(), network: NetworkConfig::default(), protocol: ProtocolConfig::default(), diff --git a/aquatic_ws/src/lib.rs b/aquatic_ws/src/lib.rs index e08b5c5..daa77ed 100644 --- a/aquatic_ws/src/lib.rs +++ b/aquatic_ws/src/lib.rs @@ -32,7 +32,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { update_access_list(&config.access_list, &state.access_list)?; - let num_peers = config.socket_workers + config.request_workers; + let num_peers = config.socket_workers + config.swarm_workers; let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE); let response_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE * 16); @@ -59,7 +59,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let placement = get_worker_placement( &config.cpu_pinning, config.socket_workers, - config.request_workers, + config.swarm_workers, WorkerIndex::SocketWorker(i), )?; let builder = LocalExecutorBuilder::new(placement).name("socket"); @@ -82,7 +82,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { executors.push(executor); } - for i in 0..(config.request_workers) { + for i in 0..(config.swarm_workers) { let sentinel = sentinel.clone(); let config = config.clone(); let state = state.clone(); @@ -92,14 +92,14 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let placement = get_worker_placement( &config.cpu_pinning, config.socket_workers, - config.request_workers, + config.swarm_workers, WorkerIndex::SwarmWorker(i), )?; let builder = LocalExecutorBuilder::new(placement).name("request"); let executor = builder .spawn(move || async move { - workers::request::run_request_worker( + workers::swarm::run_swarm_worker( sentinel, config, state, @@ -117,7 +117,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { set_affinity_for_util_worker( &config.cpu_pinning, config.socket_workers, - config.request_workers, + config.swarm_workers, )?; } diff --git a/aquatic_ws/src/workers/mod.rs b/aquatic_ws/src/workers/mod.rs index 63fc0ec..28ef095 100644 --- a/aquatic_ws/src/workers/mod.rs +++ b/aquatic_ws/src/workers/mod.rs @@ -1,2 +1,2 @@ -pub mod request; pub mod socket; +pub mod swarm; diff --git a/aquatic_ws/src/workers/socket.rs b/aquatic_ws/src/workers/socket.rs index cfda49a..42dfb5f 100644 --- a/aquatic_ws/src/workers/socket.rs +++ b/aquatic_ws/src/workers/socket.rs @@ -352,7 +352,7 @@ impl ConnectionReader { ) .await .unwrap(); - ::log::info!("sent message to request worker"); + ::log::info!("sent message to swarm worker"); } else { self.send_error_response("Info hash not allowed".into(), Some(info_hash)) .await?; @@ -404,7 +404,7 @@ impl ConnectionReader { .send_to(consumer_index, (meta, in_message)) .await .unwrap(); - ::log::info!("sent message to request worker"); + ::log::info!("sent message to swarm worker"); } } } @@ -541,7 +541,7 @@ impl ConnectionWriter { } fn calculate_in_message_consumer_index(config: &Config, info_hash: InfoHash) -> usize { - (info_hash.0[0] as usize) % config.request_workers + (info_hash.0[0] as usize) % config.swarm_workers } fn create_tcp_listener( diff --git a/aquatic_ws/src/workers/request.rs b/aquatic_ws/src/workers/swarm.rs similarity index 98% rename from aquatic_ws/src/workers/request.rs rename to aquatic_ws/src/workers/swarm.rs index 92f07ad..87cdd48 100644 --- a/aquatic_ws/src/workers/request.rs +++ b/aquatic_ws/src/workers/swarm.rs @@ -127,7 +127,7 @@ impl TorrentMaps { } } -pub async fn run_request_worker( +pub async fn run_swarm_worker( _sentinel: PanicSentinel, config: Config, state: State, @@ -223,14 +223,14 @@ async fn handle_request_stream( }; for (meta, out_message) in out_messages.drain(..) { - ::log::info!("request worker trying to send OutMessage to socket worker"); + ::log::info!("swarm worker trying to send OutMessage to socket worker"); out_message_senders .send_to(meta.out_message_consumer_id.0, (meta, out_message)) .await .expect("failed sending out_message to socket worker"); - ::log::info!("request worker sent OutMessage to socket worker"); + ::log::info!("swarm worker sent OutMessage to socket worker"); } }, )