From 99632d4be59daef6c325e12bc1aa66c84b122c40 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 18 Nov 2021 22:13:07 +0100 Subject: [PATCH] udp: implement torrent map cleaning for new, sharded torrent state --- Cargo.lock | 1 - TODO.md | 1 - aquatic_udp/Cargo.toml | 1 - aquatic_udp/src/lib/common/mod.rs | 3 -- aquatic_udp/src/lib/handlers.rs | 24 +++++++++++-- aquatic_udp/src/lib/lib.rs | 51 ++++++---------------------- aquatic_udp/src/lib/tasks.rs | 56 ------------------------------- aquatic_udp_bench/src/main.rs | 8 +++-- 8 files changed, 38 insertions(+), 107 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2e2b2f4..9325c50 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -184,7 +184,6 @@ dependencies = [ "cfg-if", "crossbeam-channel", "hex", - "histogram", "io-uring", "libc", "log", diff --git a/TODO.md b/TODO.md index b64834d..488b937 100644 --- a/TODO.md +++ b/TODO.md @@ -22,7 +22,6 @@ with 6 socket and 4 request workers. performance is great overall and faster than without sharding. io_uring impl is a lot behind mio impl with new load tester * what poll event capacity is actually needed? - * clean torrent map in workers, remove it from shared state * mio * stagger connection cleaning intervals? * uring diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index f2756e4..8e9ecb9 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -28,7 +28,6 @@ aquatic_udp_protocol = "0.1.0" cfg-if = "1" crossbeam-channel = "0.5" hex = "0.4" -histogram = "0.6" log = "0.4" mimalloc = { version = "0.1", default-features = false } parking_lot = "0.11" diff --git a/aquatic_udp/src/lib/common/mod.rs b/aquatic_udp/src/lib/common/mod.rs index 72f9862..be382b9 100644 --- a/aquatic_udp/src/lib/common/mod.rs +++ b/aquatic_udp/src/lib/common/mod.rs @@ -6,7 +6,6 @@ use std::sync::Arc; use std::time::Instant; use crossbeam_channel::Sender; -use parking_lot::Mutex; use socket2::{Domain, Protocol, Socket, Type}; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap}; @@ -304,7 +303,6 @@ pub struct Statistics { #[derive(Clone)] pub struct State { pub access_list: Arc, - pub torrents: Arc>, pub statistics: Arc, } @@ -312,7 +310,6 @@ impl Default for State { fn default() -> Self { Self { access_list: Arc::new(AccessListArcSwap::default()), - torrents: Arc::new(Mutex::new(TorrentMaps::default())), statistics: Arc::new(Statistics::default()), } } diff --git a/aquatic_udp/src/lib/handlers.rs b/aquatic_udp/src/lib/handlers.rs index 1f478c4..dcf2d28 100644 --- a/aquatic_udp/src/lib/handlers.rs +++ b/aquatic_udp/src/lib/handlers.rs @@ -2,6 +2,7 @@ use std::collections::BTreeMap; use std::net::IpAddr; use std::net::SocketAddr; use std::time::Duration; +use std::time::Instant; use aquatic_common::ValidUntil; use crossbeam_channel::Receiver; @@ -16,6 +17,7 @@ use crate::config::Config; pub fn run_request_worker( config: Config, + state: State, request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, SocketAddr)>, response_sender: ConnectedResponseSender, ) { @@ -23,11 +25,15 @@ pub fn run_request_worker( let mut small_rng = SmallRng::from_entropy(); let timeout = Duration::from_millis(config.handlers.channel_recv_timeout_ms); + let mut peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); + + let cleaning_interval = Duration::from_secs(config.cleaning.torrent_cleaning_interval); + + let mut iter_counter = 0usize; + let mut last_cleaning = Instant::now(); loop { if let Ok((sender_index, request, src)) = request_receiver.recv_timeout(timeout) { - let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); - let response = match request { ConnectedRequest::Announce(request) => handle_announce_request( &config, @@ -45,7 +51,19 @@ pub fn run_request_worker( response_sender.try_send_to(sender_index, response, src); } - // TODO: clean torrent map, update peer_valid_until + if iter_counter % 128 == 0 { + peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); + + let now = Instant::now(); + + if now > last_cleaning + cleaning_interval { + torrents.clean(&config, &state.access_list); + + last_cleaning = now; + } + } + + iter_counter = iter_counter.wrapping_add(1); } } diff --git a/aquatic_udp/src/lib/lib.rs b/aquatic_udp/src/lib/lib.rs index e321e95..014e0ca 100644 --- a/aquatic_udp/src/lib/lib.rs +++ b/aquatic_udp/src/lib/lib.rs @@ -35,33 +35,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let mut signals = Signals::new(::std::iter::once(SIGUSR1))?; - { - let config = config.clone(); - let state = state.clone(); - - ::std::thread::spawn(move || run_inner(config, state)); - } - - #[cfg(feature = "cpu-pinning")] - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - WorkerIndex::Other, - ); - - for signal in &mut signals { - match signal { - SIGUSR1 => { - let _ = update_access_list(&config.access_list, &state.access_list); - } - _ => unreachable!(), - } - } - - Ok(()) -} - -pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> { let num_bound_sockets = Arc::new(AtomicUsize::new(0)); let mut request_senders = Vec::new(); @@ -86,6 +59,7 @@ pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> { for i in 0..config.request_workers { let config = config.clone(); + let state = state.clone(); let request_receiver = request_receivers.remove(&i).unwrap().clone(); let response_sender = ConnectedResponseSender::new(response_senders.clone()); @@ -99,7 +73,7 @@ pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> { WorkerIndex::RequestWorker(i), ); - handlers::run_request_worker(config, request_receiver, response_sender) + handlers::run_request_worker(config, state, request_receiver, response_sender) }) .with_context(|| "spawn request worker")?; } @@ -146,12 +120,6 @@ pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> { .with_context(|| "spawn socket worker")?; } - ::std::mem::drop(request_senders); - ::std::mem::drop(request_receivers); - - ::std::mem::drop(response_senders); - ::std::mem::drop(response_receivers); - if config.statistics.interval != 0 { let state = state.clone(); let config = config.clone(); @@ -189,11 +157,14 @@ pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> { WorkerIndex::Other, ); - loop { - ::std::thread::sleep(Duration::from_secs( - config.cleaning.torrent_cleaning_interval, - )); - - state.torrents.lock().clean(&config, &state.access_list); + for signal in &mut signals { + match signal { + SIGUSR1 => { + let _ = update_access_list(&config.access_list, &state.access_list); + } + _ => unreachable!(), + } } + + Ok(()) } diff --git a/aquatic_udp/src/lib/tasks.rs b/aquatic_udp/src/lib/tasks.rs index c4bcac3..8e22560 100644 --- a/aquatic_udp/src/lib/tasks.rs +++ b/aquatic_udp/src/lib/tasks.rs @@ -1,7 +1,5 @@ use std::sync::atomic::Ordering; -use histogram::Histogram; - use super::common::*; use crate::config::Config; @@ -38,59 +36,5 @@ pub fn gather_and_print_statistics(state: &State, config: &Config) { bytes_sent_per_second * 8.0 / 1_000_000.0, ); - let mut total_num_torrents_ipv4 = 0usize; - let mut total_num_torrents_ipv6 = 0usize; - let mut total_num_peers_ipv4 = 0usize; - let mut total_num_peers_ipv6 = 0usize; - - let mut peers_per_torrent = Histogram::new(); - - { - let torrents = &mut state.torrents.lock(); - - for torrent in torrents.ipv4.values() { - let num_peers = torrent.num_seeders + torrent.num_leechers; - - if let Err(err) = peers_per_torrent.increment(num_peers as u64) { - ::log::error!("error incrementing peers_per_torrent histogram: {}", err) - } - - total_num_peers_ipv4 += num_peers; - } - for torrent in torrents.ipv6.values() { - let num_peers = torrent.num_seeders + torrent.num_leechers; - - if let Err(err) = peers_per_torrent.increment(num_peers as u64) { - ::log::error!("error incrementing peers_per_torrent histogram: {}", err) - } - - total_num_peers_ipv6 += num_peers; - } - - total_num_torrents_ipv4 += torrents.ipv4.len(); - total_num_torrents_ipv6 += torrents.ipv6.len(); - } - - println!( - "ipv4 torrents: {}, peers: {}; ipv6 torrents: {}, peers: {}", - total_num_torrents_ipv4, - total_num_peers_ipv4, - total_num_torrents_ipv6, - total_num_peers_ipv6, - ); - - if peers_per_torrent.entries() != 0 { - println!( - "peers per torrent: min: {}, p50: {}, p75: {}, p90: {}, p99: {}, p999: {}, max: {}", - peers_per_torrent.minimum().unwrap(), - peers_per_torrent.percentile(50.0).unwrap(), - peers_per_torrent.percentile(75.0).unwrap(), - peers_per_torrent.percentile(90.0).unwrap(), - peers_per_torrent.percentile(99.0).unwrap(), - peers_per_torrent.percentile(99.9).unwrap(), - peers_per_torrent.maximum().unwrap(), - ); - } - println!(); } diff --git a/aquatic_udp_bench/src/main.rs b/aquatic_udp_bench/src/main.rs index 1bc0034..6c77bee 100644 --- a/aquatic_udp_bench/src/main.rs +++ b/aquatic_udp_bench/src/main.rs @@ -39,7 +39,9 @@ fn main() { pub fn run(bench_config: BenchConfig) -> ::anyhow::Result<()> { // Setup common state, spawn request handlers - let aquatic_config = Config::default(); + let mut aquatic_config = Config::default(); + + aquatic_config.cleaning.torrent_cleaning_interval = 60 * 60 * 24; let (request_sender, request_receiver) = unbounded(); let (response_sender, response_receiver) = unbounded(); @@ -49,7 +51,9 @@ pub fn run(bench_config: BenchConfig) -> ::anyhow::Result<()> { { let config = aquatic_config.clone(); - ::std::thread::spawn(move || run_request_worker(config, request_receiver, response_sender)); + ::std::thread::spawn(move || { + run_request_worker(config, State::default(), request_receiver, response_sender) + }); } // Run benchmarks