http, http_private: rename request workers to swarm workers

This commit is contained in:
Joakim Frostegård 2022-07-04 11:52:51 +02:00
parent fb2794643d
commit c89406179b
12 changed files with 31 additions and 33 deletions

View file

@ -14,12 +14,12 @@ use aquatic_common::cli::LogLevel;
#[serde(default)]
pub struct Config {
/// Socket workers receive requests from the socket, parse them and send
/// them on to the request workers. They then receive responses from the
/// request workers, encode them and send them back over the socket.
/// them on to the swarm workers. They then receive responses from the
/// swarm workers, encode them and send them back over the socket.
pub socket_workers: usize,
/// Request workers receive a number of requests from socket workers,
/// Swarm workers receive a number of requests from socket workers,
/// generate responses and send them back to the socket workers.
pub request_workers: usize,
pub swarm_workers: usize,
pub log_level: LogLevel,
pub network: NetworkConfig,
pub protocol: ProtocolConfig,
@ -33,7 +33,7 @@ impl Default for Config {
fn default() -> Self {
Self {
socket_workers: 1,
request_workers: 1,
swarm_workers: 1,
log_level: LogLevel::default(),
network: NetworkConfig::default(),
protocol: ProtocolConfig::default(),

View file

@ -34,7 +34,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
update_access_list(&config.access_list, &state.access_list)?;
let num_peers = config.socket_workers + config.request_workers;
let num_peers = config.socket_workers + config.swarm_workers;
let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE);
@ -59,7 +59,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
let placement = get_worker_placement(
&config.cpu_pinning,
config.socket_workers,
config.request_workers,
config.swarm_workers,
WorkerIndex::SocketWorker(i),
)?;
let builder = LocalExecutorBuilder::new(placement).name("socket");
@ -81,7 +81,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
executors.push(executor);
}
for i in 0..(config.request_workers) {
for i in 0..(config.swarm_workers) {
let sentinel = sentinel.clone();
let config = config.clone();
let state = state.clone();
@ -90,14 +90,14 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
let placement = get_worker_placement(
&config.cpu_pinning,
config.socket_workers,
config.request_workers,
config.swarm_workers,
WorkerIndex::SwarmWorker(i),
)?;
let builder = LocalExecutorBuilder::new(placement).name("request");
let executor = builder
.spawn(move || async move {
workers::request::run_request_worker(sentinel, config, state, request_mesh_builder)
workers::swarm::run_swarm_worker(sentinel, config, state, request_mesh_builder)
.await
})
.map_err(|err| anyhow::anyhow!("Spawning executor failed: {:#}", err))?;
@ -109,7 +109,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
set_affinity_for_util_worker(
&config.cpu_pinning,
config.socket_workers,
config.request_workers,
config.swarm_workers,
)?;
}

View file

@ -1,2 +1,2 @@
pub mod request;
pub mod socket;
pub mod swarm;

View file

@ -264,10 +264,10 @@ impl Connection {
/// Take a request and:
/// - Update connection ValidUntil
/// - Return error response if request is not allowed
/// - If it is an announce request, send it to request workers an await a
/// - If it is an announce request, send it to swarm workers an await a
/// response
/// - If it is a scrape requests, split it up, pass on the parts to
/// relevant request workers and await a response
/// relevant swarm workers and await a response
async fn handle_request(&mut self, request: Request) -> anyhow::Result<Response> {
if let Ok(mut slab) = self.connection_slab.try_borrow_mut() {
if let Some(reference) = slab.get_mut(self.connection_id.0) {
@ -448,7 +448,7 @@ impl Connection {
}
fn calculate_request_consumer_index(config: &Config, info_hash: InfoHash) -> usize {
(info_hash.0[0] as usize) % config.request_workers
(info_hash.0[0] as usize) % config.swarm_workers
}
fn create_tcp_listener(

View file

@ -157,7 +157,7 @@ impl TorrentMaps {
}
}
pub async fn run_request_worker(
pub async fn run_swarm_worker(
_sentinel: PanicSentinel,
config: Config,
state: State,
@ -235,7 +235,7 @@ async fn handle_request_stream<S>(
);
if let Err(err) = response_sender.connect().await.send(response).await {
::log::error!("request worker could not send announce response: {:#}", err);
::log::error!("swarm worker could not send announce response: {:#}", err);
}
}
ChannelRequest::Scrape {
@ -247,7 +247,7 @@ async fn handle_request_stream<S>(
handle_scrape_request(&config, &mut torrents.borrow_mut(), peer_addr, request);
if let Err(err) = response_sender.connect().await.send(response).await {
::log::error!("request worker could not send scrape response: {:#}", err);
::log::error!("swarm worker could not send scrape response: {:#}", err);
}
}
};

View file

@ -17,7 +17,7 @@ pub struct RequestWorkerIndex(pub usize);
impl RequestWorkerIndex {
pub fn from_info_hash(config: &Config, info_hash: InfoHash) -> Self {
Self(info_hash.0[0] as usize % config.request_workers)
Self(info_hash.0[0] as usize % config.swarm_workers)
}
}

View file

@ -11,12 +11,12 @@ use aquatic_common::cli::LogLevel;
#[serde(default)]
pub struct Config {
/// Socket workers receive requests from the socket, parse them and send
/// them on to the request workers. They then receive responses from the
/// request workers, encode them and send them back over the socket.
/// them on to the swarm workers. They then receive responses from the
/// swarm workers, encode them and send them back over the socket.
pub socket_workers: usize,
/// Request workers receive a number of requests from socket workers,
/// Swarm workers receive a number of requests from socket workers,
/// generate responses and send them back to the socket workers.
pub request_workers: usize,
pub swarm_workers: usize,
pub worker_channel_size: usize,
/// Number of database connections to establish in each socket worker
pub db_connections_per_worker: u32,
@ -31,7 +31,7 @@ impl Default for Config {
fn default() -> Self {
Self {
socket_workers: 1,
request_workers: 1,
swarm_workers: 1,
worker_channel_size: 128,
db_connections_per_worker: 4,
log_level: LogLevel::default(),

View file

@ -30,7 +30,7 @@ pub fn run(config: Config) -> anyhow::Result<()> {
let mut request_senders = Vec::new();
let mut request_receivers = VecDeque::new();
for _ in 0..config.request_workers {
for _ in 0..config.swarm_workers {
let (request_sender, request_receiver) = channel(config.worker_channel_size);
request_senders.push(request_sender);
@ -64,16 +64,14 @@ pub fn run(config: Config) -> anyhow::Result<()> {
handles.push(handle);
}
for _ in 0..config.request_workers {
for _ in 0..config.swarm_workers {
let sentinel = sentinel.clone();
let config = config.clone();
let request_receiver = request_receivers.pop_front().unwrap();
let handle = ::std::thread::Builder::new()
.name("request".into())
.spawn(move || {
workers::request::run_request_worker(sentinel, config, request_receiver)
})?;
.spawn(move || workers::swarm::run_swarm_worker(sentinel, config, request_receiver))?;
handles.push(handle);
}

View file

@ -1,2 +1,2 @@
pub mod request;
pub mod socket;
pub mod swarm;

View file

@ -33,7 +33,7 @@ pub async fn announce(
let request = AnnounceRequest::from_query_string(&query)
.map_err(|_| FailureResponse::new("Malformed request"))?;
let request_worker_index = RequestWorkerIndex::from_info_hash(&config, request.info_hash);
let swarm_worker_index = RequestWorkerIndex::from_info_hash(&config, request.info_hash);
let opt_user_agent = opt_user_agent.map(|header| header.as_str().to_owned());
let source_addr = CanonicalSocketAddr::new(source_addr);
@ -43,7 +43,7 @@ pub async fn announce(
.await?;
let response_receiver = request_sender
.send_to(request_worker_index, validated_request, source_addr)
.send_to(swarm_worker_index, validated_request, source_addr)
.await
.map_err(|err| internal_error(format!("Sending request over channel failed: {:#}", err)))?;

View file

@ -21,7 +21,7 @@ use crate::config::Config;
use common::*;
pub fn run_request_worker(
pub fn run_swarm_worker(
_sentinel: PanicSentinel,
config: Config,
request_receiver: Receiver<ChannelAnnounceRequest>,