aquatic_http: glommio: periodically remove closed connections

This commit is contained in:
Joakim Frostegård 2021-10-27 13:19:02 +02:00
parent 00e77e8655
commit 1a4e5750a3
2 changed files with 32 additions and 6 deletions

View file

@ -6,7 +6,7 @@
* privdrop * privdrop
* clean out connections regularly * clean out connections regularly
* timeout inside of task for "it took to long to receive request, send response"? * timeout inside of task for "it took to long to receive request, send response"?
* remove finished tasks from slab * handle panicked/cancelled tasks
* test with load tester with multiple workers * test with load tester with multiple workers
* get rid of / improve ConnectionMeta stuff in handler * get rid of / improve ConnectionMeta stuff in handler
* consider better error type for request parsing, so that better error * consider better error type for request parsing, so that better error

View file

@ -5,6 +5,7 @@ use std::net::SocketAddr;
use std::rc::Rc; use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use aquatic_http_protocol::common::InfoHash; use aquatic_http_protocol::common::InfoHash;
use aquatic_http_protocol::request::{AnnounceRequest, Request, RequestParseError, ScrapeRequest}; use aquatic_http_protocol::request::{AnnounceRequest, Request, RequestParseError, ScrapeRequest};
@ -16,8 +17,9 @@ use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders};
use glommio::channels::local_channel::{new_bounded, LocalReceiver, LocalSender}; use glommio::channels::local_channel::{new_bounded, LocalReceiver, LocalSender};
use glommio::channels::shared_channel::ConnectedReceiver; use glommio::channels::shared_channel::ConnectedReceiver;
use glommio::net::{TcpListener, TcpStream}; use glommio::net::{TcpListener, TcpStream};
use glommio::prelude::*; use glommio::{enclose, prelude::*};
use glommio::task::JoinHandle; use glommio::task::JoinHandle;
use glommio::timer::TimerActionRepeat;
use rustls::ServerConnection; use rustls::ServerConnection;
use slab::Slab; use slab::Slab;
@ -70,6 +72,27 @@ pub async fn run_socket_worker(
let response_consumer_id = ConsumerId(response_receivers.consumer_id().unwrap()); let response_consumer_id = ConsumerId(response_receivers.consumer_id().unwrap());
let connection_slab = Rc::new(RefCell::new(Slab::new())); let connection_slab = Rc::new(RefCell::new(Slab::new()));
let connections_to_remove = Rc::new(RefCell::new(Vec::new()));
// Periodically remove closed connections
TimerActionRepeat::repeat(enclose!((config, connection_slab, connections_to_remove) move || {
enclose!((config, connection_slab, connections_to_remove) move || async move {
let connections_to_remove = connections_to_remove.replace(Vec::new());
for connection_id in connections_to_remove {
if let Some(_) = connection_slab.borrow_mut().try_remove(connection_id) {
::log::debug!("removed connection with id {}", connection_id);
} else {
::log::error!(
"couldn't remove connection with id {}, it is not in connection slab",
connection_id
);
}
}
Some(Duration::from_secs(config.cleaning.interval))
})()
}));
for (_, response_receiver) in response_receivers.streams() { for (_, response_receiver) in response_receivers.streams() {
spawn_local(receive_responses( spawn_local(receive_responses(
@ -88,8 +111,9 @@ pub async fn run_socket_worker(
let mut slab = connection_slab.borrow_mut(); let mut slab = connection_slab.borrow_mut();
let entry = slab.vacant_entry(); let entry = slab.vacant_entry();
let key = entry.key();
let conn = Connection { let mut conn = Connection {
config: config.clone(), config: config.clone(),
request_senders: request_senders.clone(), request_senders: request_senders.clone(),
response_receiver, response_receiver,
@ -102,13 +126,15 @@ pub async fn run_socket_worker(
pending_scrape_response: None, pending_scrape_response: None,
}; };
async fn handle_stream(mut conn: Connection) { let connections_to_remove = connections_to_remove.clone();
let handle = spawn_local(async move {
if let Err(err) = conn.handle_stream().await { if let Err(err) = conn.handle_stream().await {
::log::info!("conn.handle_stream() error: {:?}", err); ::log::info!("conn.handle_stream() error: {:?}", err);
} }
}
let handle = spawn_local(handle_stream(conn)).detach(); connections_to_remove.borrow_mut().push(key);
}).detach();
let connection_reference = ConnectionReference { let connection_reference = ConnectionReference {
response_sender, response_sender,