From d922e5e6802e6aec13a6b81624fa0e841f544ff2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 2 Nov 2021 20:32:13 +0100 Subject: [PATCH] aquatic_udp: mio: update access list on SIGHUP instead of regularly --- Cargo.lock | 21 +++++++ aquatic_common/Cargo.toml | 1 + aquatic_common/src/access_list.rs | 24 ++++++-- aquatic_udp/Cargo.toml | 1 + aquatic_udp/src/lib/mio/mod.rs | 91 +++++++++++++++++-------------- 5 files changed, 92 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0735770..1f2308c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -79,6 +79,7 @@ dependencies = [ "hashbrown 0.11.2", "hex", "indexmap", + "log", "privdrop", "rand", "serde", @@ -181,6 +182,7 @@ dependencies = [ "quickcheck_macros", "rand", "serde", + "signal-hook", "socket2 0.4.2", ] @@ -1754,6 +1756,25 @@ dependencies = [ "opaque-debug", ] +[[package]] +name = "signal-hook" +version = "0.3.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c98891d737e271a2954825ef19e46bd16bdb98e2746f2eec4f7a4ef7946efd1" +dependencies = [ + "libc", + "signal-hook-registry", +] + +[[package]] +name = "signal-hook-registry" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0" +dependencies = [ + "libc", +] + [[package]] name = "simd-json" version = "0.4.8" diff --git a/aquatic_common/Cargo.toml b/aquatic_common/Cargo.toml index e86e0d0..3f34d1f 100644 --- a/aquatic_common/Cargo.toml +++ b/aquatic_common/Cargo.toml @@ -16,6 +16,7 @@ arc-swap = "1" hashbrown = "0.11.2" hex = "0.4" indexmap = "1" +log = "0.4" privdrop = "0.5" rand = { version = "0.8", features = ["small_rng"] } serde = { version = "1", features = ["derive"] } diff --git a/aquatic_common/src/access_list.rs b/aquatic_common/src/access_list.rs index 69f856c..a2c3f31 100644 --- a/aquatic_common/src/access_list.rs +++ b/aquatic_common/src/access_list.rs @@ -75,17 +75,31 @@ impl AccessList { } pub trait AccessListQuery { - fn update_from_path(&self, path: &PathBuf) -> anyhow::Result<()>; + fn update(&self, config: &AccessListConfig); fn allows(&self, list_mode: AccessListMode, info_hash_bytes: &[u8; 20]) -> bool; } pub type AccessListArcSwap = ArcSwap; impl AccessListQuery for AccessListArcSwap { - fn update_from_path(&self, path: &PathBuf) -> anyhow::Result<()> { - self.store(Arc::new(AccessList::create_from_path(path)?)); - - Ok(()) + fn update(&self, config: &AccessListConfig) { + match config.mode { + AccessListMode::White | AccessListMode::Black => { + match AccessList::create_from_path(&config.path) { + Ok(new) => { + self.store(Arc::new(new)); + } + Err(err) => { + ::log::error!("Updating access list failed: {:?}", err); + } + } + } + AccessListMode::Off => { + ::log::error!( + "AccessListQuery::update_from_path called, but AccessListMode is Off" + ); + } + } } fn allows(&self, mode: AccessListMode, info_hash_bytes: &[u8; 20]) -> bool { diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index a7bf538..e418054 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -34,6 +34,7 @@ mimalloc = { version = "0.1", default-features = false } parking_lot = "0.11" rand = { version = "0.8", features = ["small_rng"] } serde = { version = "1", features = ["derive"] } +signal-hook = { version = "0.3" } # mio crossbeam-channel = { version = "0.5", optional = true } diff --git a/aquatic_udp/src/lib/mio/mod.rs b/aquatic_udp/src/lib/mio/mod.rs index 1b8e656..b247c8b 100644 --- a/aquatic_udp/src/lib/mio/mod.rs +++ b/aquatic_udp/src/lib/mio/mod.rs @@ -9,15 +9,17 @@ use anyhow::Context; use aquatic_common::privileges::drop_privileges_after_socket_binding; use crossbeam_channel::unbounded; +use aquatic_common::access_list::AccessListQuery; +use signal_hook::consts::SIGHUP; +use signal_hook::iterator::Signals; + +use crate::config::Config; + pub mod common; pub mod handlers; pub mod network; pub mod tasks; -use aquatic_common::access_list::{AccessListArcSwap, AccessListMode, AccessListQuery}; - -use crate::config::Config; - use common::State; pub fn run(config: Config) -> ::anyhow::Result<()> { @@ -27,38 +29,42 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { }); } + let mut signals = Signals::new(::std::iter::once(SIGHUP))?; + let state = State::default(); - update_access_list(&config, &state.access_list); + { + let config = config.clone(); + let state = state.clone(); + + ::std::thread::spawn(move || run_inner(config, state)); + } + + for signal in &mut signals { + match signal { + SIGHUP => { + ::log::info!("Updating access list"); + + state.access_list.update(&config.access_list); + } + _ => unreachable!(), + } + } + + Ok(()) +} + +pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> { + if config.cpu_pinning.active { + core_affinity::set_for_current(core_affinity::CoreId { + id: config.cpu_pinning.offset, + }); + } + + state.access_list.update(&config.access_list); let num_bound_sockets = Arc::new(AtomicUsize::new(0)); - start_workers(config.clone(), state.clone(), num_bound_sockets.clone())?; - - drop_privileges_after_socket_binding( - &config.privileges, - num_bound_sockets, - config.socket_workers, - ) - .unwrap(); - - loop { - ::std::thread::sleep(Duration::from_secs(config.cleaning.interval)); - - update_access_list(&config, &state.access_list); - - state - .torrents - .lock() - .clean(&config, state.access_list.load_full().deref()); - } -} - -pub fn start_workers( - config: Config, - state: State, - num_bound_sockets: Arc, -) -> ::anyhow::Result<()> { let (request_sender, request_receiver) = unbounded(); let (response_sender, response_receiver) = unbounded(); @@ -132,16 +138,19 @@ pub fn start_workers( .with_context(|| "spawn statistics worker")?; } - Ok(()) -} + drop_privileges_after_socket_binding( + &config.privileges, + num_bound_sockets, + config.socket_workers, + ) + .unwrap(); -pub fn update_access_list(config: &Config, access_list: &Arc) { - match config.access_list.mode { - AccessListMode::White | AccessListMode::Black => { - if let Err(err) = access_list.update_from_path(&config.access_list.path) { - ::log::error!("Update access list from path: {:?}", err); - } - } - AccessListMode::Off => {} + loop { + ::std::thread::sleep(Duration::from_secs(config.cleaning.interval)); + + state + .torrents + .lock() + .clean(&config, state.access_list.load_full().deref()); } }