diff --git a/TODO.md b/TODO.md index e363f42..6ceb42a 100644 --- a/TODO.md +++ b/TODO.md @@ -6,7 +6,7 @@ * privdrop * clean out connections regularly * timeout inside of task for "it took to long to receive request, send response"? - * remove finished tasks from slab + * handle panicked/cancelled tasks * test with load tester with multiple workers * get rid of / improve ConnectionMeta stuff in handler * consider better error type for request parsing, so that better error diff --git a/aquatic_http/src/lib/glommio/network.rs b/aquatic_http/src/lib/glommio/network.rs index 117e3d3..9ca5984 100644 --- a/aquatic_http/src/lib/glommio/network.rs +++ b/aquatic_http/src/lib/glommio/network.rs @@ -5,6 +5,7 @@ use std::net::SocketAddr; use std::rc::Rc; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; +use std::time::Duration; use aquatic_http_protocol::common::InfoHash; use aquatic_http_protocol::request::{AnnounceRequest, Request, RequestParseError, ScrapeRequest}; @@ -16,8 +17,9 @@ use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; use glommio::channels::local_channel::{new_bounded, LocalReceiver, LocalSender}; use glommio::channels::shared_channel::ConnectedReceiver; use glommio::net::{TcpListener, TcpStream}; -use glommio::prelude::*; +use glommio::{enclose, prelude::*}; use glommio::task::JoinHandle; +use glommio::timer::TimerActionRepeat; use rustls::ServerConnection; use slab::Slab; @@ -70,6 +72,27 @@ pub async fn run_socket_worker( let response_consumer_id = ConsumerId(response_receivers.consumer_id().unwrap()); let connection_slab = Rc::new(RefCell::new(Slab::new())); + let connections_to_remove = Rc::new(RefCell::new(Vec::new())); + + // Periodically remove closed connections + TimerActionRepeat::repeat(enclose!((config, connection_slab, connections_to_remove) move || { + enclose!((config, connection_slab, connections_to_remove) move || async move { + let connections_to_remove = connections_to_remove.replace(Vec::new()); + + for connection_id in connections_to_remove { + if let Some(_) = connection_slab.borrow_mut().try_remove(connection_id) { + ::log::debug!("removed connection with id {}", connection_id); + } else { + ::log::error!( + "couldn't remove connection with id {}, it is not in connection slab", + connection_id + ); + } + } + + Some(Duration::from_secs(config.cleaning.interval)) + })() + })); for (_, response_receiver) in response_receivers.streams() { spawn_local(receive_responses( @@ -88,8 +111,9 @@ pub async fn run_socket_worker( let mut slab = connection_slab.borrow_mut(); let entry = slab.vacant_entry(); + let key = entry.key(); - let conn = Connection { + let mut conn = Connection { config: config.clone(), request_senders: request_senders.clone(), response_receiver, @@ -102,13 +126,15 @@ pub async fn run_socket_worker( pending_scrape_response: None, }; - async fn handle_stream(mut conn: Connection) { + let connections_to_remove = connections_to_remove.clone(); + + let handle = spawn_local(async move { if let Err(err) = conn.handle_stream().await { ::log::info!("conn.handle_stream() error: {:?}", err); } - } - let handle = spawn_local(handle_stream(conn)).detach(); + connections_to_remove.borrow_mut().push(key); + }).detach(); let connection_reference = ConnectionReference { response_sender,