udp: implement torrent map cleaning for new, sharded torrent state

This commit is contained in:
Joakim Frostegård 2021-11-18 22:13:07 +01:00
parent 54149ed3eb
commit 99632d4be5
8 changed files with 38 additions and 107 deletions

View file

@ -35,33 +35,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
let mut signals = Signals::new(::std::iter::once(SIGUSR1))?;
{
let config = config.clone();
let state = state.clone();
::std::thread::spawn(move || run_inner(config, state));
}
#[cfg(feature = "cpu-pinning")]
pin_current_if_configured_to(
&config.cpu_pinning,
config.socket_workers,
WorkerIndex::Other,
);
for signal in &mut signals {
match signal {
SIGUSR1 => {
let _ = update_access_list(&config.access_list, &state.access_list);
}
_ => unreachable!(),
}
}
Ok(())
}
pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> {
let num_bound_sockets = Arc::new(AtomicUsize::new(0));
let mut request_senders = Vec::new();
@ -86,6 +59,7 @@ pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> {
for i in 0..config.request_workers {
let config = config.clone();
let state = state.clone();
let request_receiver = request_receivers.remove(&i).unwrap().clone();
let response_sender = ConnectedResponseSender::new(response_senders.clone());
@ -99,7 +73,7 @@ pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> {
WorkerIndex::RequestWorker(i),
);
handlers::run_request_worker(config, request_receiver, response_sender)
handlers::run_request_worker(config, state, request_receiver, response_sender)
})
.with_context(|| "spawn request worker")?;
}
@ -146,12 +120,6 @@ pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> {
.with_context(|| "spawn socket worker")?;
}
::std::mem::drop(request_senders);
::std::mem::drop(request_receivers);
::std::mem::drop(response_senders);
::std::mem::drop(response_receivers);
if config.statistics.interval != 0 {
let state = state.clone();
let config = config.clone();
@ -189,11 +157,14 @@ pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> {
WorkerIndex::Other,
);
loop {
::std::thread::sleep(Duration::from_secs(
config.cleaning.torrent_cleaning_interval,
));
state.torrents.lock().clean(&config, &state.access_list);
for signal in &mut signals {
match signal {
SIGUSR1 => {
let _ = update_access_list(&config.access_list, &state.access_list);
}
_ => unreachable!(),
}
}
Ok(())
}