WIP: use shared swarm state in mio worker

This commit is contained in:
Joakim Frostegård 2024-02-10 11:21:48 +01:00
parent 53497308f1
commit 2da966098f
5 changed files with 79 additions and 255 deletions

View file

@ -23,6 +23,7 @@ use common::{
SwarmWorkerIndex,
};
use config::Config;
use swarm::TorrentMaps;
use workers::socket::ConnectionValidator;
use workers::swarm::SwarmWorker;
@ -32,81 +33,24 @@ pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION");
pub fn run(config: Config) -> ::anyhow::Result<()> {
let mut signals = Signals::new([SIGUSR1])?;
let state = State::default();
let state = State::new(&config);
let statistics = Statistics::new(&config);
let connection_validator = ConnectionValidator::new(&config)?;
let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers);
let mut join_handles = Vec::new();
update_access_list(&config.access_list, &state.access_list)?;
let mut request_senders = Vec::new();
let mut request_receivers = BTreeMap::new();
let mut response_senders = Vec::new();
let mut response_receivers = BTreeMap::new();
let (statistics_sender, statistics_receiver) = unbounded();
for i in 0..config.swarm_workers {
let (request_sender, request_receiver) = bounded(config.worker_channel_size);
request_senders.push(request_sender);
request_receivers.insert(i, request_receiver);
}
for i in 0..config.socket_workers {
let (response_sender, response_receiver) = bounded(config.worker_channel_size);
response_senders.push(response_sender);
response_receivers.insert(i, response_receiver);
}
for i in 0..config.swarm_workers {
let config = config.clone();
let state = state.clone();
let request_receiver = request_receivers.remove(&i).unwrap().clone();
let response_sender = ConnectedResponseSender::new(response_senders.clone());
let statistics_sender = statistics_sender.clone();
let statistics = statistics.swarm[i].clone();
let handle = Builder::new()
.name(format!("swarm-{:02}", i + 1))
.spawn(move || {
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
config.swarm_workers,
WorkerIndex::SwarmWorker(i),
);
let mut worker = SwarmWorker {
config,
state,
statistics,
request_receiver,
response_sender,
statistics_sender,
worker_index: SwarmWorkerIndex(i),
};
worker.run()
})
.with_context(|| "spawn swarm worker")?;
join_handles.push((WorkerType::Swarm(i), handle));
}
for i in 0..config.socket_workers {
let state = state.clone();
let config = config.clone();
let connection_validator = connection_validator.clone();
let request_sender =
ConnectedRequestSender::new(SocketWorkerIndex(i), request_senders.clone());
let response_receiver = response_receivers.remove(&i).unwrap();
let priv_dropper = priv_dropper.clone();
let statistics = statistics.socket[i].clone();
let statistics_sender = statistics_sender.clone();
let handle = Builder::new()
.name(format!("socket-{:02}", i + 1))
@ -123,9 +67,8 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
config,
state,
statistics,
statistics_sender,
connection_validator,
request_sender,
response_receiver,
priv_dropper,
)
})