diff --git a/README.md b/README.md
index 000e02b..dd277a1 100644
--- a/README.md
+++ b/README.md
@@ -72,7 +72,7 @@ respective configuration files.
#### Workers
To increase performance, number of worker threads can be increased. The sum of
-`socket_workers` and `request_workers` should equal the total number of CPU cores
+`socket_workers` and `swarm_workers` should equal the total number of CPU cores
that you want to use. Recommended proportions:
@@ -92,7 +92,7 @@ that you want to use. Recommended proportions:
>=20
-
Request workers
+
Swarm workers
1
2
3
@@ -180,7 +180,7 @@ This is the most mature of the implementations. I consider it ready for producti

-More details are available [here](./documents/aquatic-udp-load-test-2021-11-28.pdf).
+More details are available [here](./documents/aquatic-udp-load-test-2021-11-28.pdf). Please note that request workers have been renamed to swarm workers.
#### Optimisation attempts that didn't work out
@@ -213,7 +213,7 @@ fine.

-More details are available [here](./documents/aquatic-http-load-test-2022-04-11.pdf).
+More details are available [here](./documents/aquatic-http-load-test-2022-04-11.pdf). Please note that request workers have been renamed to swarm workers.
### aquatic_ws: WebTorrent tracker
@@ -231,7 +231,7 @@ fine.

-More details are available [here](./documents/aquatic-ws-load-test-2022-03-29.pdf).
+More details are available [here](./documents/aquatic-ws-load-test-2022-03-29.pdf). Please note that request workers have been renamed to swarm workers.
## Load testing
diff --git a/TODO.md b/TODO.md
index c34f091..fa3d7e9 100644
--- a/TODO.md
+++ b/TODO.md
@@ -7,7 +7,6 @@
## Medium priority
-* rename request workers to swarm workers
* quit whole program if any thread panics
* But it would be nice not to panic in workers, but to return errors instead.
Once JoinHandle::is_finished is available in stable Rust (#90470), an
diff --git a/aquatic_common/src/cpu_pinning.rs b/aquatic_common/src/cpu_pinning.rs
index 57335f0..cd0d219 100644
--- a/aquatic_common/src/cpu_pinning.rs
+++ b/aquatic_common/src/cpu_pinning.rs
@@ -90,7 +90,7 @@ pub mod mod_name {
#[derive(Clone, Copy, Debug)]
pub enum WorkerIndex {
SocketWorker(usize),
- RequestWorker(usize),
+ SwarmWorker(usize),
Util,
}
@@ -99,13 +99,13 @@ impl WorkerIndex {
&self,
config: &C,
socket_workers: usize,
- request_workers: usize,
+ swarm_workers: usize,
num_cores: usize,
) -> usize {
let ascending_index = match self {
Self::SocketWorker(index) => config.core_offset() + index,
- Self::RequestWorker(index) => config.core_offset() + socket_workers + index,
- Self::Util => config.core_offset() + socket_workers + request_workers,
+ Self::SwarmWorker(index) => config.core_offset() + socket_workers + index,
+ Self::Util => config.core_offset() + socket_workers + swarm_workers,
};
let max_core_index = num_cores - 1;
@@ -153,13 +153,13 @@ pub mod glommio {
fn get_worker_cpu_set(
config: &C,
socket_workers: usize,
- request_workers: usize,
+ swarm_workers: usize,
worker_index: WorkerIndex,
) -> anyhow::Result {
let num_cpu_cores = get_num_cpu_cores()?;
let core_index =
- worker_index.get_core_index(config, socket_workers, request_workers, num_cpu_cores);
+ worker_index.get_core_index(config, socket_workers, swarm_workers, num_cpu_cores);
let too_many_workers = match (&config.hyperthread(), &config.direction()) {
(
@@ -223,12 +223,11 @@ pub mod glommio {
pub fn get_worker_placement(
config: &C,
socket_workers: usize,
- request_workers: usize,
+ swarm_workers: usize,
worker_index: WorkerIndex,
) -> anyhow::Result {
if config.active() {
- let cpu_set =
- get_worker_cpu_set(config, socket_workers, request_workers, worker_index)?;
+ let cpu_set = get_worker_cpu_set(config, socket_workers, swarm_workers, worker_index)?;
Ok(Placement::Fenced(cpu_set))
} else {
@@ -239,10 +238,10 @@ pub mod glommio {
pub fn set_affinity_for_util_worker(
config: &C,
socket_workers: usize,
- request_workers: usize,
+ swarm_workers: usize,
) -> anyhow::Result<()> {
let worker_cpu_set =
- get_worker_cpu_set(config, socket_workers, request_workers, WorkerIndex::Util)?;
+ get_worker_cpu_set(config, socket_workers, swarm_workers, WorkerIndex::Util)?;
unsafe {
let mut set: libc::cpu_set_t = ::std::mem::zeroed();
@@ -275,7 +274,7 @@ pub mod glommio {
pub fn pin_current_if_configured_to(
config: &C,
socket_workers: usize,
- request_workers: usize,
+ swarm_workers: usize,
worker_index: WorkerIndex,
) {
use hwloc::{CpuSet, ObjectType, Topology, CPUBIND_THREAD};
@@ -293,7 +292,7 @@ pub fn pin_current_if_configured_to(
let num_cores = core_cpu_sets.len();
let core_index =
- worker_index.get_core_index(config, socket_workers, request_workers, num_cores);
+ worker_index.get_core_index(config, socket_workers, swarm_workers, num_cores);
let cpu_set = core_cpu_sets
.get(core_index)
diff --git a/aquatic_http/src/config.rs b/aquatic_http/src/config.rs
index d3cceaa..033ab35 100644
--- a/aquatic_http/src/config.rs
+++ b/aquatic_http/src/config.rs
@@ -14,12 +14,12 @@ use aquatic_common::cli::LogLevel;
#[serde(default)]
pub struct Config {
/// Socket workers receive requests from the socket, parse them and send
- /// them on to the request workers. They then receive responses from the
- /// request workers, encode them and send them back over the socket.
+ /// them on to the swarm workers. They then receive responses from the
+ /// swarm workers, encode them and send them back over the socket.
pub socket_workers: usize,
- /// Request workers receive a number of requests from socket workers,
+ /// Swarm workers receive a number of requests from socket workers,
/// generate responses and send them back to the socket workers.
- pub request_workers: usize,
+ pub swarm_workers: usize,
pub log_level: LogLevel,
pub network: NetworkConfig,
pub protocol: ProtocolConfig,
@@ -33,7 +33,7 @@ impl Default for Config {
fn default() -> Self {
Self {
socket_workers: 1,
- request_workers: 1,
+ swarm_workers: 1,
log_level: LogLevel::default(),
network: NetworkConfig::default(),
protocol: ProtocolConfig::default(),
diff --git a/aquatic_http/src/lib.rs b/aquatic_http/src/lib.rs
index 83f81dd..328ada9 100644
--- a/aquatic_http/src/lib.rs
+++ b/aquatic_http/src/lib.rs
@@ -34,7 +34,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
update_access_list(&config.access_list, &state.access_list)?;
- let num_peers = config.socket_workers + config.request_workers;
+ let num_peers = config.socket_workers + config.swarm_workers;
let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE);
@@ -59,7 +59,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
let placement = get_worker_placement(
&config.cpu_pinning,
config.socket_workers,
- config.request_workers,
+ config.swarm_workers,
WorkerIndex::SocketWorker(i),
)?;
let builder = LocalExecutorBuilder::new(placement).name("socket");
@@ -81,7 +81,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
executors.push(executor);
}
- for i in 0..(config.request_workers) {
+ for i in 0..(config.swarm_workers) {
let sentinel = sentinel.clone();
let config = config.clone();
let state = state.clone();
@@ -90,14 +90,14 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
let placement = get_worker_placement(
&config.cpu_pinning,
config.socket_workers,
- config.request_workers,
- WorkerIndex::RequestWorker(i),
+ config.swarm_workers,
+ WorkerIndex::SwarmWorker(i),
)?;
let builder = LocalExecutorBuilder::new(placement).name("request");
let executor = builder
.spawn(move || async move {
- workers::request::run_request_worker(sentinel, config, state, request_mesh_builder)
+ workers::swarm::run_swarm_worker(sentinel, config, state, request_mesh_builder)
.await
})
.map_err(|err| anyhow::anyhow!("Spawning executor failed: {:#}", err))?;
@@ -109,7 +109,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
set_affinity_for_util_worker(
&config.cpu_pinning,
config.socket_workers,
- config.request_workers,
+ config.swarm_workers,
)?;
}
diff --git a/aquatic_http/src/workers/mod.rs b/aquatic_http/src/workers/mod.rs
index 63fc0ec..28ef095 100644
--- a/aquatic_http/src/workers/mod.rs
+++ b/aquatic_http/src/workers/mod.rs
@@ -1,2 +1,2 @@
-pub mod request;
pub mod socket;
+pub mod swarm;
diff --git a/aquatic_http/src/workers/socket.rs b/aquatic_http/src/workers/socket.rs
index 37e0b16..f47bb33 100644
--- a/aquatic_http/src/workers/socket.rs
+++ b/aquatic_http/src/workers/socket.rs
@@ -264,10 +264,10 @@ impl Connection {
/// Take a request and:
/// - Update connection ValidUntil
/// - Return error response if request is not allowed
- /// - If it is an announce request, send it to request workers an await a
+ /// - If it is an announce request, send it to swarm workers an await a
/// response
/// - If it is a scrape requests, split it up, pass on the parts to
- /// relevant request workers and await a response
+ /// relevant swarm workers and await a response
async fn handle_request(&mut self, request: Request) -> anyhow::Result {
if let Ok(mut slab) = self.connection_slab.try_borrow_mut() {
if let Some(reference) = slab.get_mut(self.connection_id.0) {
@@ -448,7 +448,7 @@ impl Connection {
}
fn calculate_request_consumer_index(config: &Config, info_hash: InfoHash) -> usize {
- (info_hash.0[0] as usize) % config.request_workers
+ (info_hash.0[0] as usize) % config.swarm_workers
}
fn create_tcp_listener(
diff --git a/aquatic_http/src/workers/request.rs b/aquatic_http/src/workers/swarm.rs
similarity index 98%
rename from aquatic_http/src/workers/request.rs
rename to aquatic_http/src/workers/swarm.rs
index 099298a..c952938 100644
--- a/aquatic_http/src/workers/request.rs
+++ b/aquatic_http/src/workers/swarm.rs
@@ -157,7 +157,7 @@ impl TorrentMaps {
}
}
-pub async fn run_request_worker(
+pub async fn run_swarm_worker(
_sentinel: PanicSentinel,
config: Config,
state: State,
@@ -235,7 +235,7 @@ async fn handle_request_stream(
);
if let Err(err) = response_sender.connect().await.send(response).await {
- ::log::error!("request worker could not send announce response: {:#}", err);
+ ::log::error!("swarm worker could not send announce response: {:#}", err);
}
}
ChannelRequest::Scrape {
@@ -247,7 +247,7 @@ async fn handle_request_stream(
handle_scrape_request(&config, &mut torrents.borrow_mut(), peer_addr, request);
if let Err(err) = response_sender.connect().await.send(response).await {
- ::log::error!("request worker could not send scrape response: {:#}", err);
+ ::log::error!("swarm worker could not send scrape response: {:#}", err);
}
}
};
diff --git a/aquatic_http_private/src/common.rs b/aquatic_http_private/src/common.rs
index 092d09a..b1b247b 100644
--- a/aquatic_http_private/src/common.rs
+++ b/aquatic_http_private/src/common.rs
@@ -17,7 +17,7 @@ pub struct RequestWorkerIndex(pub usize);
impl RequestWorkerIndex {
pub fn from_info_hash(config: &Config, info_hash: InfoHash) -> Self {
- Self(info_hash.0[0] as usize % config.request_workers)
+ Self(info_hash.0[0] as usize % config.swarm_workers)
}
}
diff --git a/aquatic_http_private/src/config.rs b/aquatic_http_private/src/config.rs
index b800cc8..43e8e2c 100644
--- a/aquatic_http_private/src/config.rs
+++ b/aquatic_http_private/src/config.rs
@@ -11,12 +11,12 @@ use aquatic_common::cli::LogLevel;
#[serde(default)]
pub struct Config {
/// Socket workers receive requests from the socket, parse them and send
- /// them on to the request workers. They then receive responses from the
- /// request workers, encode them and send them back over the socket.
+ /// them on to the swarm workers. They then receive responses from the
+ /// swarm workers, encode them and send them back over the socket.
pub socket_workers: usize,
- /// Request workers receive a number of requests from socket workers,
+ /// Swarm workers receive a number of requests from socket workers,
/// generate responses and send them back to the socket workers.
- pub request_workers: usize,
+ pub swarm_workers: usize,
pub worker_channel_size: usize,
/// Number of database connections to establish in each socket worker
pub db_connections_per_worker: u32,
@@ -31,7 +31,7 @@ impl Default for Config {
fn default() -> Self {
Self {
socket_workers: 1,
- request_workers: 1,
+ swarm_workers: 1,
worker_channel_size: 128,
db_connections_per_worker: 4,
log_level: LogLevel::default(),
diff --git a/aquatic_http_private/src/lib.rs b/aquatic_http_private/src/lib.rs
index 88609e1..e10ee83 100644
--- a/aquatic_http_private/src/lib.rs
+++ b/aquatic_http_private/src/lib.rs
@@ -30,7 +30,7 @@ pub fn run(config: Config) -> anyhow::Result<()> {
let mut request_senders = Vec::new();
let mut request_receivers = VecDeque::new();
- for _ in 0..config.request_workers {
+ for _ in 0..config.swarm_workers {
let (request_sender, request_receiver) = channel(config.worker_channel_size);
request_senders.push(request_sender);
@@ -64,16 +64,14 @@ pub fn run(config: Config) -> anyhow::Result<()> {
handles.push(handle);
}
- for _ in 0..config.request_workers {
+ for _ in 0..config.swarm_workers {
let sentinel = sentinel.clone();
let config = config.clone();
let request_receiver = request_receivers.pop_front().unwrap();
let handle = ::std::thread::Builder::new()
.name("request".into())
- .spawn(move || {
- workers::request::run_request_worker(sentinel, config, request_receiver)
- })?;
+ .spawn(move || workers::swarm::run_swarm_worker(sentinel, config, request_receiver))?;
handles.push(handle);
}
diff --git a/aquatic_http_private/src/workers/mod.rs b/aquatic_http_private/src/workers/mod.rs
index 63fc0ec..28ef095 100644
--- a/aquatic_http_private/src/workers/mod.rs
+++ b/aquatic_http_private/src/workers/mod.rs
@@ -1,2 +1,2 @@
-pub mod request;
pub mod socket;
+pub mod swarm;
diff --git a/aquatic_http_private/src/workers/socket/routes.rs b/aquatic_http_private/src/workers/socket/routes.rs
index 8fd139b..1d83f29 100644
--- a/aquatic_http_private/src/workers/socket/routes.rs
+++ b/aquatic_http_private/src/workers/socket/routes.rs
@@ -33,7 +33,7 @@ pub async fn announce(
let request = AnnounceRequest::from_query_string(&query)
.map_err(|_| FailureResponse::new("Malformed request"))?;
- let request_worker_index = RequestWorkerIndex::from_info_hash(&config, request.info_hash);
+ let swarm_worker_index = RequestWorkerIndex::from_info_hash(&config, request.info_hash);
let opt_user_agent = opt_user_agent.map(|header| header.as_str().to_owned());
let source_addr = CanonicalSocketAddr::new(source_addr);
@@ -43,7 +43,7 @@ pub async fn announce(
.await?;
let response_receiver = request_sender
- .send_to(request_worker_index, validated_request, source_addr)
+ .send_to(swarm_worker_index, validated_request, source_addr)
.await
.map_err(|err| internal_error(format!("Sending request over channel failed: {:#}", err)))?;
diff --git a/aquatic_http_private/src/workers/request/common.rs b/aquatic_http_private/src/workers/swarm/common.rs
similarity index 100%
rename from aquatic_http_private/src/workers/request/common.rs
rename to aquatic_http_private/src/workers/swarm/common.rs
diff --git a/aquatic_http_private/src/workers/request/mod.rs b/aquatic_http_private/src/workers/swarm/mod.rs
similarity index 99%
rename from aquatic_http_private/src/workers/request/mod.rs
rename to aquatic_http_private/src/workers/swarm/mod.rs
index c684256..121c34d 100644
--- a/aquatic_http_private/src/workers/request/mod.rs
+++ b/aquatic_http_private/src/workers/swarm/mod.rs
@@ -21,7 +21,7 @@ use crate::config::Config;
use common::*;
-pub fn run_request_worker(
+pub fn run_swarm_worker(
_sentinel: PanicSentinel,
config: Config,
request_receiver: Receiver,
diff --git a/aquatic_udp/src/common.rs b/aquatic_udp/src/common.rs
index 0bd6f11..8880dfe 100644
--- a/aquatic_udp/src/common.rs
+++ b/aquatic_udp/src/common.rs
@@ -43,11 +43,11 @@ pub enum ConnectedResponse {
pub struct SocketWorkerIndex(pub usize);
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
-pub struct RequestWorkerIndex(pub usize);
+pub struct SwarmWorkerIndex(pub usize);
-impl RequestWorkerIndex {
+impl SwarmWorkerIndex {
pub fn from_info_hash(config: &Config, info_hash: InfoHash) -> Self {
- Self(info_hash.0[0] as usize % config.request_workers)
+ Self(info_hash.0[0] as usize % config.swarm_workers)
}
}
@@ -66,14 +66,14 @@ impl ConnectedRequestSender {
pub fn try_send_to(
&self,
- index: RequestWorkerIndex,
+ index: SwarmWorkerIndex,
request: ConnectedRequest,
addr: CanonicalSocketAddr,
) {
match self.senders[index.0].try_send((self.index, request, addr)) {
Ok(()) => {}
Err(TrySendError::Full(_)) => {
- ::log::error!("Request channel {} is full, dropping request. Try increasing number of request workers or raising config.worker_channel_size.", index.0)
+ ::log::error!("Request channel {} is full, dropping request. Try increasing number of swarm workers or raising config.worker_channel_size.", index.0)
}
Err(TrySendError::Disconnected(_)) => {
panic!("Request channel {} is disconnected", index.0);
@@ -145,7 +145,7 @@ pub struct Statistics {
}
impl Statistics {
- pub fn new(num_request_workers: usize) -> Self {
+ pub fn new(num_swarm_workers: usize) -> Self {
Self {
requests_received: Default::default(),
responses_sent_connect: Default::default(),
@@ -154,8 +154,8 @@ impl Statistics {
responses_sent_error: Default::default(),
bytes_received: Default::default(),
bytes_sent: Default::default(),
- torrents: Self::create_atomic_usize_vec(num_request_workers),
- peers: Self::create_atomic_usize_vec(num_request_workers),
+ torrents: Self::create_atomic_usize_vec(num_swarm_workers),
+ peers: Self::create_atomic_usize_vec(num_swarm_workers),
}
}
@@ -174,11 +174,11 @@ pub struct State {
}
impl State {
- pub fn new(num_request_workers: usize) -> Self {
+ pub fn new(num_swarm_workers: usize) -> Self {
Self {
access_list: Arc::new(AccessListArcSwap::default()),
- statistics_ipv4: Arc::new(Statistics::new(num_request_workers)),
- statistics_ipv6: Arc::new(Statistics::new(num_request_workers)),
+ statistics_ipv4: Arc::new(Statistics::new(num_swarm_workers)),
+ statistics_ipv6: Arc::new(Statistics::new(num_swarm_workers)),
}
}
}
diff --git a/aquatic_udp/src/config.rs b/aquatic_udp/src/config.rs
index 858a70a..10cddd1 100644
--- a/aquatic_udp/src/config.rs
+++ b/aquatic_udp/src/config.rs
@@ -11,18 +11,18 @@ use aquatic_toml_config::TomlConfig;
#[serde(default)]
pub struct Config {
/// Socket workers receive requests from the socket, parse them and send
- /// them on to the request workers. They then receive responses from the
- /// request workers, encode them and send them back over the socket.
+ /// them on to the swarm workers. They then receive responses from the
+ /// swarm workers, encode them and send them back over the socket.
pub socket_workers: usize,
- /// Request workers receive a number of requests from socket workers,
+ /// Swarm workers receive a number of requests from socket workers,
/// generate responses and send them back to the socket workers.
- pub request_workers: usize,
+ pub swarm_workers: usize,
pub log_level: LogLevel,
/// Maximum number of items in each channel passing requests/responses
/// between workers. A value of zero means that the channel will be of
/// unbounded size.
pub worker_channel_size: usize,
- /// How long to block waiting for requests in request workers. Higher
+ /// How long to block waiting for requests in swarm workers. Higher
/// values means that with zero traffic, the worker will not unnecessarily
/// cause the CPU to wake up as often. However, high values (something like
/// larger than 1000) combined with very low traffic can cause delays
@@ -42,7 +42,7 @@ impl Default for Config {
fn default() -> Self {
Self {
socket_workers: 1,
- request_workers: 1,
+ swarm_workers: 1,
log_level: LogLevel::Error,
worker_channel_size: 0,
request_channel_recv_timeout_ms: 100,
@@ -182,7 +182,7 @@ pub struct CleaningConfig {
pub max_connection_age: u32,
/// Remove peers who have not announced for this long (seconds)
pub max_peer_age: u64,
- /// Remove pending scrape responses that have not been returned from request
+ /// Remove pending scrape responses that have not been returned from swarm
/// workers for this long (seconds)
pub max_pending_scrape_age: u64,
}
diff --git a/aquatic_udp/src/lib.rs b/aquatic_udp/src/lib.rs
index aa66e11..815d69a 100644
--- a/aquatic_udp/src/lib.rs
+++ b/aquatic_udp/src/lib.rs
@@ -17,7 +17,7 @@ use aquatic_common::privileges::PrivilegeDropper;
use aquatic_common::PanicSentinelWatcher;
use common::{
- ConnectedRequestSender, ConnectedResponseSender, RequestWorkerIndex, SocketWorkerIndex, State,
+ ConnectedRequestSender, ConnectedResponseSender, SocketWorkerIndex, State, SwarmWorkerIndex,
};
use config::Config;
use workers::socket::validator::ConnectionValidator;
@@ -28,7 +28,7 @@ pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION");
pub fn run(config: Config) -> ::anyhow::Result<()> {
let mut signals = Signals::new([SIGUSR1, SIGTERM])?;
- let state = State::new(config.request_workers);
+ let state = State::new(config.swarm_workers);
let connection_validator = ConnectionValidator::new(&config)?;
let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel();
let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers);
@@ -41,7 +41,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
let mut response_senders = Vec::new();
let mut response_receivers = BTreeMap::new();
- for i in 0..config.request_workers {
+ for i in 0..config.swarm_workers {
let (request_sender, request_receiver) = if config.worker_channel_size == 0 {
unbounded()
} else {
@@ -63,7 +63,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
response_receivers.insert(i, response_receiver);
}
- for i in 0..config.request_workers {
+ for i in 0..config.swarm_workers {
let sentinel = sentinel.clone();
let config = config.clone();
let state = state.clone();
@@ -71,26 +71,26 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
let response_sender = ConnectedResponseSender::new(response_senders.clone());
Builder::new()
- .name(format!("request-{:02}", i + 1))
+ .name(format!("swarm-{:02}", i + 1))
.spawn(move || {
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
- config.request_workers,
- WorkerIndex::RequestWorker(i),
+ config.swarm_workers,
+ WorkerIndex::SwarmWorker(i),
);
- workers::request::run_request_worker(
+ workers::swarm::run_swarm_worker(
sentinel,
config,
state,
request_receiver,
response_sender,
- RequestWorkerIndex(i),
+ SwarmWorkerIndex(i),
)
})
- .with_context(|| "spawn request worker")?;
+ .with_context(|| "spawn swarm worker")?;
}
for i in 0..config.socket_workers {
@@ -110,7 +110,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
- config.request_workers,
+ config.swarm_workers,
WorkerIndex::SocketWorker(i),
);
@@ -140,7 +140,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
- config.request_workers,
+ config.swarm_workers,
WorkerIndex::Util,
);
@@ -153,7 +153,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
- config.request_workers,
+ config.swarm_workers,
WorkerIndex::Util,
);
diff --git a/aquatic_udp/src/workers/mod.rs b/aquatic_udp/src/workers/mod.rs
index 668fd84..5446a1f 100644
--- a/aquatic_udp/src/workers/mod.rs
+++ b/aquatic_udp/src/workers/mod.rs
@@ -1,3 +1,3 @@
-pub mod request;
pub mod socket;
pub mod statistics;
+pub mod swarm;
diff --git a/aquatic_udp/src/workers/socket/requests.rs b/aquatic_udp/src/workers/socket/requests.rs
index 779f080..eb995bd 100644
--- a/aquatic_udp/src/workers/socket/requests.rs
+++ b/aquatic_udp/src/workers/socket/requests.rs
@@ -127,8 +127,7 @@ fn handle_request(
.load()
.allows(access_list_mode, &request.info_hash.0)
{
- let worker_index =
- RequestWorkerIndex::from_info_hash(config, request.info_hash);
+ let worker_index = SwarmWorkerIndex::from_info_hash(config, request.info_hash);
request_sender.try_send_to(
worker_index,
@@ -153,9 +152,9 @@ fn handle_request(
pending_scrape_valid_until,
);
- for (request_worker_index, request) in split_requests {
+ for (swarm_worker_index, request) in split_requests {
request_sender.try_send_to(
- request_worker_index,
+ swarm_worker_index,
ConnectedRequest::Scrape(request),
src,
);
diff --git a/aquatic_udp/src/workers/socket/storage.rs b/aquatic_udp/src/workers/socket/storage.rs
index 4717daf..ee5ec54 100644
--- a/aquatic_udp/src/workers/socket/storage.rs
+++ b/aquatic_udp/src/workers/socket/storage.rs
@@ -27,9 +27,9 @@ impl PendingScrapeResponseSlab {
config: &Config,
request: ScrapeRequest,
valid_until: ValidUntil,
- ) -> impl IntoIterator {
- let capacity = config.request_workers.min(request.info_hashes.len());
- let mut split_requests: HashMap =
+ ) -> impl IntoIterator {
+ let capacity = config.swarm_workers.min(request.info_hashes.len());
+ let mut split_requests: HashMap =
HashMap::with_capacity(capacity);
if request.info_hashes.is_empty() {
@@ -45,7 +45,7 @@ impl PendingScrapeResponseSlab {
for (i, info_hash) in request.info_hashes.into_iter().enumerate() {
let split_request = split_requests
- .entry(RequestWorkerIndex::from_info_hash(&config, info_hash))
+ .entry(SwarmWorkerIndex::from_info_hash(&config, info_hash))
.or_insert_with(|| PendingScrapeRequest {
slab_key,
info_hashes: BTreeMap::new(),
@@ -128,15 +128,15 @@ mod tests {
#[quickcheck]
fn test_pending_scrape_response_slab(
request_data: Vec<(i32, i64, u8)>,
- request_workers: u8,
+ swarm_workers: u8,
) -> TestResult {
- if request_workers == 0 {
+ if swarm_workers == 0 {
return TestResult::discard();
}
let mut config = Config::default();
- config.request_workers = request_workers as usize;
+ config.swarm_workers = swarm_workers as usize;
let valid_until = ValidUntil::new(1);
@@ -175,7 +175,7 @@ mod tests {
all_split_requests.push(
split_requests
.into_iter()
- .collect::>(),
+ .collect::>(),
);
}
@@ -185,7 +185,7 @@ mod tests {
for split_requests in all_split_requests {
for (worker_index, split_request) in split_requests {
- assert!(worker_index.0 < request_workers as usize);
+ assert!(worker_index.0 < swarm_workers as usize);
let torrent_stats = split_request
.info_hashes
diff --git a/aquatic_udp/src/workers/request/mod.rs b/aquatic_udp/src/workers/swarm/mod.rs
similarity index 98%
rename from aquatic_udp/src/workers/request/mod.rs
rename to aquatic_udp/src/workers/swarm/mod.rs
index 1b8335a..b2cc7ed 100644
--- a/aquatic_udp/src/workers/request/mod.rs
+++ b/aquatic_udp/src/workers/swarm/mod.rs
@@ -17,13 +17,13 @@ use crate::config::Config;
use storage::{Peer, TorrentMap, TorrentMaps};
-pub fn run_request_worker(
+pub fn run_swarm_worker(
_sentinel: PanicSentinel,
config: Config,
state: State,
request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>,
response_sender: ConnectedResponseSender,
- worker_index: RequestWorkerIndex,
+ worker_index: SwarmWorkerIndex,
) {
let mut torrents = TorrentMaps::default();
let mut rng = SmallRng::from_entropy();
diff --git a/aquatic_udp/src/workers/request/storage.rs b/aquatic_udp/src/workers/swarm/storage.rs
similarity index 100%
rename from aquatic_udp/src/workers/request/storage.rs
rename to aquatic_udp/src/workers/swarm/storage.rs
diff --git a/aquatic_udp_bench/src/main.rs b/aquatic_udp_bench/src/main.rs
index 89ca6da..3a7c256 100644
--- a/aquatic_udp_bench/src/main.rs
+++ b/aquatic_udp_bench/src/main.rs
@@ -8,7 +8,7 @@
//! ```
use aquatic_common::PanicSentinelWatcher;
-use aquatic_udp::workers::request::run_request_worker;
+use aquatic_udp::workers::swarm::run_swarm_worker;
use crossbeam_channel::unbounded;
use num_format::{Locale, ToFormattedString};
use rand::{rngs::SmallRng, thread_rng, Rng, SeedableRng};
@@ -53,16 +53,16 @@ pub fn run(bench_config: BenchConfig) -> ::anyhow::Result<()> {
{
let config = aquatic_config.clone();
- let state = State::new(config.request_workers);
+ let state = State::new(config.swarm_workers);
::std::thread::spawn(move || {
- run_request_worker(
+ run_swarm_worker(
sentinel,
config,
state,
request_receiver,
response_sender,
- RequestWorkerIndex(0),
+ SwarmWorkerIndex(0),
)
});
}
diff --git a/aquatic_ws/src/config.rs b/aquatic_ws/src/config.rs
index fbfe8bf..136ab9f 100644
--- a/aquatic_ws/src/config.rs
+++ b/aquatic_ws/src/config.rs
@@ -13,12 +13,12 @@ use aquatic_toml_config::TomlConfig;
#[serde(default)]
pub struct Config {
/// Socket workers receive requests from the socket, parse them and send
- /// them on to the request workers. They then receive responses from the
- /// request workers, encode them and send them back over the socket.
+ /// them on to the swarm workers. They then receive responses from the
+ /// swarm workers, encode them and send them back over the socket.
pub socket_workers: usize,
- /// Request workers receive a number of requests from socket workers,
+ /// Swarm workers receive a number of requests from socket workers,
/// generate responses and send them back to the socket workers.
- pub request_workers: usize,
+ pub swarm_workers: usize,
pub log_level: LogLevel,
pub network: NetworkConfig,
pub protocol: ProtocolConfig,
@@ -32,7 +32,7 @@ impl Default for Config {
fn default() -> Self {
Self {
socket_workers: 1,
- request_workers: 1,
+ swarm_workers: 1,
log_level: LogLevel::default(),
network: NetworkConfig::default(),
protocol: ProtocolConfig::default(),
diff --git a/aquatic_ws/src/lib.rs b/aquatic_ws/src/lib.rs
index b6ffc22..daa77ed 100644
--- a/aquatic_ws/src/lib.rs
+++ b/aquatic_ws/src/lib.rs
@@ -32,7 +32,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
update_access_list(&config.access_list, &state.access_list)?;
- let num_peers = config.socket_workers + config.request_workers;
+ let num_peers = config.socket_workers + config.swarm_workers;
let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE);
let response_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE * 16);
@@ -59,7 +59,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
let placement = get_worker_placement(
&config.cpu_pinning,
config.socket_workers,
- config.request_workers,
+ config.swarm_workers,
WorkerIndex::SocketWorker(i),
)?;
let builder = LocalExecutorBuilder::new(placement).name("socket");
@@ -82,7 +82,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
executors.push(executor);
}
- for i in 0..(config.request_workers) {
+ for i in 0..(config.swarm_workers) {
let sentinel = sentinel.clone();
let config = config.clone();
let state = state.clone();
@@ -92,14 +92,14 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
let placement = get_worker_placement(
&config.cpu_pinning,
config.socket_workers,
- config.request_workers,
- WorkerIndex::RequestWorker(i),
+ config.swarm_workers,
+ WorkerIndex::SwarmWorker(i),
)?;
let builder = LocalExecutorBuilder::new(placement).name("request");
let executor = builder
.spawn(move || async move {
- workers::request::run_request_worker(
+ workers::swarm::run_swarm_worker(
sentinel,
config,
state,
@@ -117,7 +117,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
set_affinity_for_util_worker(
&config.cpu_pinning,
config.socket_workers,
- config.request_workers,
+ config.swarm_workers,
)?;
}
diff --git a/aquatic_ws/src/workers/mod.rs b/aquatic_ws/src/workers/mod.rs
index 63fc0ec..28ef095 100644
--- a/aquatic_ws/src/workers/mod.rs
+++ b/aquatic_ws/src/workers/mod.rs
@@ -1,2 +1,2 @@
-pub mod request;
pub mod socket;
+pub mod swarm;
diff --git a/aquatic_ws/src/workers/socket.rs b/aquatic_ws/src/workers/socket.rs
index cfda49a..42dfb5f 100644
--- a/aquatic_ws/src/workers/socket.rs
+++ b/aquatic_ws/src/workers/socket.rs
@@ -352,7 +352,7 @@ impl ConnectionReader {
)
.await
.unwrap();
- ::log::info!("sent message to request worker");
+ ::log::info!("sent message to swarm worker");
} else {
self.send_error_response("Info hash not allowed".into(), Some(info_hash))
.await?;
@@ -404,7 +404,7 @@ impl ConnectionReader {
.send_to(consumer_index, (meta, in_message))
.await
.unwrap();
- ::log::info!("sent message to request worker");
+ ::log::info!("sent message to swarm worker");
}
}
}
@@ -541,7 +541,7 @@ impl ConnectionWriter {
}
fn calculate_in_message_consumer_index(config: &Config, info_hash: InfoHash) -> usize {
- (info_hash.0[0] as usize) % config.request_workers
+ (info_hash.0[0] as usize) % config.swarm_workers
}
fn create_tcp_listener(
diff --git a/aquatic_ws/src/workers/request.rs b/aquatic_ws/src/workers/swarm.rs
similarity index 98%
rename from aquatic_ws/src/workers/request.rs
rename to aquatic_ws/src/workers/swarm.rs
index 92f07ad..87cdd48 100644
--- a/aquatic_ws/src/workers/request.rs
+++ b/aquatic_ws/src/workers/swarm.rs
@@ -127,7 +127,7 @@ impl TorrentMaps {
}
}
-pub async fn run_request_worker(
+pub async fn run_swarm_worker(
_sentinel: PanicSentinel,
config: Config,
state: State,
@@ -223,14 +223,14 @@ async fn handle_request_stream(
};
for (meta, out_message) in out_messages.drain(..) {
- ::log::info!("request worker trying to send OutMessage to socket worker");
+ ::log::info!("swarm worker trying to send OutMessage to socket worker");
out_message_senders
.send_to(meta.out_message_consumer_id.0, (meta, out_message))
.await
.expect("failed sending out_message to socket worker");
- ::log::info!("request worker sent OutMessage to socket worker");
+ ::log::info!("swarm worker sent OutMessage to socket worker");
}
},
)
diff --git a/documents/aquatic-architecture-2022-02-02.svg b/documents/aquatic-architecture-2022-02-02.svg
index 58fc08c..4d00454 100644
--- a/documents/aquatic-architecture-2022-02-02.svg
+++ b/documents/aquatic-architecture-2022-02-02.svg
@@ -1,3 +1,3 @@
-
\ No newline at end of file
+
\ No newline at end of file