From 6d8646351ca14076eb8531c057da4dd1a058110f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 20 Mar 2022 19:34:53 +0100 Subject: [PATCH] http: implement connection cleaning --- aquatic_http/src/config.rs | 8 +++- aquatic_http/src/workers/socket.rs | 77 +++++++++++++++++++----------- 2 files changed, 55 insertions(+), 30 deletions(-) diff --git a/aquatic_http/src/config.rs b/aquatic_http/src/config.rs index 3bb54b8..6a0b883 100644 --- a/aquatic_http/src/config.rs +++ b/aquatic_http/src/config.rs @@ -46,7 +46,7 @@ pub struct NetworkConfig { pub tls_certificate_path: PathBuf, /// Path to TLS private key (DER-encoded ASN.1 in PKCS#8 or PKCS#1 format) pub tls_private_key_path: PathBuf, - /// Keep connections alive + /// Keep connections alive after sending a response pub keep_alive: bool, } @@ -66,8 +66,12 @@ pub struct ProtocolConfig { pub struct CleaningConfig { /// Clean peers this often (seconds) pub torrent_cleaning_interval: u64, + /// Clean connections this often (seconds) + pub connection_cleaning_interval: u64, /// Remove peers that have not announced for this long (seconds) pub max_peer_age: u64, + /// Remove connections that haven't seen valid requests for this long (seconds) + pub max_connection_idle: u64, } impl Default for Config { @@ -114,7 +118,9 @@ impl Default for CleaningConfig { fn default() -> Self { Self { torrent_cleaning_interval: 30, + connection_cleaning_interval: 60, max_peer_age: 1800, + max_connection_idle: 180, } } } diff --git a/aquatic_http/src/workers/socket.rs b/aquatic_http/src/workers/socket.rs index e7e9d35..8c8b512 100644 --- a/aquatic_http/src/workers/socket.rs +++ b/aquatic_http/src/workers/socket.rs @@ -4,7 +4,7 @@ use std::os::unix::prelude::{FromRawFd, IntoRawFd}; use std::rc::Rc; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; use aquatic_common::CanonicalSocketAddr; @@ -21,6 +21,7 @@ use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; use glommio::channels::local_channel::{new_bounded, LocalReceiver, LocalSender}; use glommio::channels::shared_channel::ConnectedReceiver; use glommio::net::{TcpListener, TcpStream}; +use glommio::task::JoinHandle; use glommio::timer::TimerActionRepeat; use glommio::{enclose, prelude::*}; use once_cell::sync::Lazy; @@ -45,7 +46,9 @@ struct PendingScrapeResponse { } struct ConnectionReference { + task_handle: Option>, response_sender: LocalSender, + valid_until: ValidUntil, } pub async fn run_socket_worker( @@ -69,18 +72,14 @@ pub async fn run_socket_worker( let response_consumer_id = ConsumerId(response_receivers.consumer_id().unwrap()); let connection_slab = Rc::new(RefCell::new(Slab::new())); - let connections_to_remove = Rc::new(RefCell::new(Vec::new())); // Periodically remove closed connections - TimerActionRepeat::repeat( - enclose!((config, connection_slab, connections_to_remove) move || { - remove_closed_connections( - config.clone(), - connection_slab.clone(), - connections_to_remove.clone(), - ) - }), - ); + TimerActionRepeat::repeat(enclose!((config, connection_slab) move || { + clean_connections( + config.clone(), + connection_slab.clone(), + ) + })); for (_, response_receiver) in response_receivers.streams() { spawn_local(receive_responses( @@ -96,11 +95,14 @@ pub async fn run_socket_worker( match stream { Ok(stream) => { let (response_sender, response_receiver) = new_bounded(config.request_workers); - let key = connection_slab - .borrow_mut() - .insert(ConnectionReference { response_sender }); - spawn_local(enclose!((config, access_list, request_senders, tls_config, connections_to_remove) async move { + let key = connection_slab.borrow_mut().insert(ConnectionReference { + task_handle: None, + response_sender, + valid_until: ValidUntil::new(config.cleaning.max_connection_idle), + }); + + let task_handle = spawn_local(enclose!((config, access_list, request_senders, tls_config, connection_slab) async move { if let Err(err) = Connection::run( config, access_list, @@ -109,14 +111,19 @@ pub async fn run_socket_worker( response_consumer_id, ConnectionId(key), tls_config, + connection_slab.clone(), stream ).await { ::log::debug!("Connection::run() error: {:?}", err); } - connections_to_remove.borrow_mut().push(key); + connection_slab.borrow_mut().try_remove(key); })) .detach(); + + if let Some(reference) = connection_slab.borrow_mut().get_mut(key) { + reference.task_handle = Some(task_handle); + } } Err(err) => { ::log::error!("accept connection: {:?}", err); @@ -125,26 +132,28 @@ pub async fn run_socket_worker( } } -async fn remove_closed_connections( +async fn clean_connections( config: Rc, connection_slab: Rc>>, - connections_to_remove: Rc>>, ) -> Option { - let connections_to_remove = connections_to_remove.replace(Vec::new()); + let now = Instant::now(); - for connection_id in connections_to_remove { - if let Some(_) = connection_slab.borrow_mut().try_remove(connection_id) { - ::log::debug!("removed connection with id {}", connection_id); - } else { - ::log::error!( - "couldn't remove connection with id {}, it is not in connection slab", - connection_id - ); + connection_slab.borrow_mut().retain(|_, reference| { + let keep = reference.valid_until.0 > now; + + if !keep { + if let Some(ref handle) = reference.task_handle { + handle.cancel(); + } } - } + + keep + }); + + connection_slab.borrow_mut().shrink_to_fit(); Some(Duration::from_secs( - config.cleaning.torrent_cleaning_interval, + config.cleaning.connection_cleaning_interval, )) } @@ -170,6 +179,7 @@ struct Connection { request_senders: Rc>, response_receiver: LocalReceiver, response_consumer_id: ConsumerId, + connection_slab: Rc>>, stream: TlsStream, peer_addr: CanonicalSocketAddr, connection_id: ConnectionId, @@ -187,6 +197,7 @@ impl Connection { response_consumer_id: ConsumerId, connection_id: ConnectionId, tls_config: Arc, + connection_slab: Rc>>, stream: TcpStream, ) -> anyhow::Result<()> { let peer_addr = stream @@ -207,6 +218,7 @@ impl Connection { request_senders: request_senders.clone(), response_receiver, response_consumer_id, + connection_slab, stream, peer_addr, connection_id, @@ -289,12 +301,19 @@ impl Connection { } /// Take a request and: + /// - Update connection ValidUntil /// - Return error response if request is not allowed /// - If it is an announce request, send it to request workers an await a /// response /// - If it is a scrape requests, split it up, pass on the parts to /// relevant request workers and await a response async fn handle_request(&mut self, request: Request) -> anyhow::Result { + if let Ok(mut slab) = self.connection_slab.try_borrow_mut() { + if let Some(reference) = slab.get_mut(self.connection_id.0) { + reference.valid_until = ValidUntil::new(self.config.cleaning.max_connection_idle); + } + } + match request { Request::Announce(request) => { let info_hash = request.info_hash;