mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-03-31 17:55:36 +00:00
http: implement connection cleaning
This commit is contained in:
parent
66dd92e575
commit
6d8646351c
2 changed files with 55 additions and 30 deletions
|
|
@ -46,7 +46,7 @@ pub struct NetworkConfig {
|
||||||
pub tls_certificate_path: PathBuf,
|
pub tls_certificate_path: PathBuf,
|
||||||
/// Path to TLS private key (DER-encoded ASN.1 in PKCS#8 or PKCS#1 format)
|
/// Path to TLS private key (DER-encoded ASN.1 in PKCS#8 or PKCS#1 format)
|
||||||
pub tls_private_key_path: PathBuf,
|
pub tls_private_key_path: PathBuf,
|
||||||
/// Keep connections alive
|
/// Keep connections alive after sending a response
|
||||||
pub keep_alive: bool,
|
pub keep_alive: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -66,8 +66,12 @@ pub struct ProtocolConfig {
|
||||||
pub struct CleaningConfig {
|
pub struct CleaningConfig {
|
||||||
/// Clean peers this often (seconds)
|
/// Clean peers this often (seconds)
|
||||||
pub torrent_cleaning_interval: u64,
|
pub torrent_cleaning_interval: u64,
|
||||||
|
/// Clean connections this often (seconds)
|
||||||
|
pub connection_cleaning_interval: u64,
|
||||||
/// Remove peers that have not announced for this long (seconds)
|
/// Remove peers that have not announced for this long (seconds)
|
||||||
pub max_peer_age: u64,
|
pub max_peer_age: u64,
|
||||||
|
/// Remove connections that haven't seen valid requests for this long (seconds)
|
||||||
|
pub max_connection_idle: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for Config {
|
impl Default for Config {
|
||||||
|
|
@ -114,7 +118,9 @@ impl Default for CleaningConfig {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
torrent_cleaning_interval: 30,
|
torrent_cleaning_interval: 30,
|
||||||
|
connection_cleaning_interval: 60,
|
||||||
max_peer_age: 1800,
|
max_peer_age: 1800,
|
||||||
|
max_connection_idle: 180,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,7 @@ use std::os::unix::prelude::{FromRawFd, IntoRawFd};
|
||||||
use std::rc::Rc;
|
use std::rc::Rc;
|
||||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache};
|
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache};
|
||||||
use aquatic_common::CanonicalSocketAddr;
|
use aquatic_common::CanonicalSocketAddr;
|
||||||
|
|
@ -21,6 +21,7 @@ use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders};
|
||||||
use glommio::channels::local_channel::{new_bounded, LocalReceiver, LocalSender};
|
use glommio::channels::local_channel::{new_bounded, LocalReceiver, LocalSender};
|
||||||
use glommio::channels::shared_channel::ConnectedReceiver;
|
use glommio::channels::shared_channel::ConnectedReceiver;
|
||||||
use glommio::net::{TcpListener, TcpStream};
|
use glommio::net::{TcpListener, TcpStream};
|
||||||
|
use glommio::task::JoinHandle;
|
||||||
use glommio::timer::TimerActionRepeat;
|
use glommio::timer::TimerActionRepeat;
|
||||||
use glommio::{enclose, prelude::*};
|
use glommio::{enclose, prelude::*};
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
|
|
@ -45,7 +46,9 @@ struct PendingScrapeResponse {
|
||||||
}
|
}
|
||||||
|
|
||||||
struct ConnectionReference {
|
struct ConnectionReference {
|
||||||
|
task_handle: Option<JoinHandle<()>>,
|
||||||
response_sender: LocalSender<ChannelResponse>,
|
response_sender: LocalSender<ChannelResponse>,
|
||||||
|
valid_until: ValidUntil,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run_socket_worker(
|
pub async fn run_socket_worker(
|
||||||
|
|
@ -69,18 +72,14 @@ pub async fn run_socket_worker(
|
||||||
let response_consumer_id = ConsumerId(response_receivers.consumer_id().unwrap());
|
let response_consumer_id = ConsumerId(response_receivers.consumer_id().unwrap());
|
||||||
|
|
||||||
let connection_slab = Rc::new(RefCell::new(Slab::new()));
|
let connection_slab = Rc::new(RefCell::new(Slab::new()));
|
||||||
let connections_to_remove = Rc::new(RefCell::new(Vec::new()));
|
|
||||||
|
|
||||||
// Periodically remove closed connections
|
// Periodically remove closed connections
|
||||||
TimerActionRepeat::repeat(
|
TimerActionRepeat::repeat(enclose!((config, connection_slab) move || {
|
||||||
enclose!((config, connection_slab, connections_to_remove) move || {
|
clean_connections(
|
||||||
remove_closed_connections(
|
config.clone(),
|
||||||
config.clone(),
|
connection_slab.clone(),
|
||||||
connection_slab.clone(),
|
)
|
||||||
connections_to_remove.clone(),
|
}));
|
||||||
)
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
|
|
||||||
for (_, response_receiver) in response_receivers.streams() {
|
for (_, response_receiver) in response_receivers.streams() {
|
||||||
spawn_local(receive_responses(
|
spawn_local(receive_responses(
|
||||||
|
|
@ -96,11 +95,14 @@ pub async fn run_socket_worker(
|
||||||
match stream {
|
match stream {
|
||||||
Ok(stream) => {
|
Ok(stream) => {
|
||||||
let (response_sender, response_receiver) = new_bounded(config.request_workers);
|
let (response_sender, response_receiver) = new_bounded(config.request_workers);
|
||||||
let key = connection_slab
|
|
||||||
.borrow_mut()
|
|
||||||
.insert(ConnectionReference { response_sender });
|
|
||||||
|
|
||||||
spawn_local(enclose!((config, access_list, request_senders, tls_config, connections_to_remove) async move {
|
let key = connection_slab.borrow_mut().insert(ConnectionReference {
|
||||||
|
task_handle: None,
|
||||||
|
response_sender,
|
||||||
|
valid_until: ValidUntil::new(config.cleaning.max_connection_idle),
|
||||||
|
});
|
||||||
|
|
||||||
|
let task_handle = spawn_local(enclose!((config, access_list, request_senders, tls_config, connection_slab) async move {
|
||||||
if let Err(err) = Connection::run(
|
if let Err(err) = Connection::run(
|
||||||
config,
|
config,
|
||||||
access_list,
|
access_list,
|
||||||
|
|
@ -109,14 +111,19 @@ pub async fn run_socket_worker(
|
||||||
response_consumer_id,
|
response_consumer_id,
|
||||||
ConnectionId(key),
|
ConnectionId(key),
|
||||||
tls_config,
|
tls_config,
|
||||||
|
connection_slab.clone(),
|
||||||
stream
|
stream
|
||||||
).await {
|
).await {
|
||||||
::log::debug!("Connection::run() error: {:?}", err);
|
::log::debug!("Connection::run() error: {:?}", err);
|
||||||
}
|
}
|
||||||
|
|
||||||
connections_to_remove.borrow_mut().push(key);
|
connection_slab.borrow_mut().try_remove(key);
|
||||||
}))
|
}))
|
||||||
.detach();
|
.detach();
|
||||||
|
|
||||||
|
if let Some(reference) = connection_slab.borrow_mut().get_mut(key) {
|
||||||
|
reference.task_handle = Some(task_handle);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
::log::error!("accept connection: {:?}", err);
|
::log::error!("accept connection: {:?}", err);
|
||||||
|
|
@ -125,26 +132,28 @@ pub async fn run_socket_worker(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn remove_closed_connections(
|
async fn clean_connections(
|
||||||
config: Rc<Config>,
|
config: Rc<Config>,
|
||||||
connection_slab: Rc<RefCell<Slab<ConnectionReference>>>,
|
connection_slab: Rc<RefCell<Slab<ConnectionReference>>>,
|
||||||
connections_to_remove: Rc<RefCell<Vec<usize>>>,
|
|
||||||
) -> Option<Duration> {
|
) -> Option<Duration> {
|
||||||
let connections_to_remove = connections_to_remove.replace(Vec::new());
|
let now = Instant::now();
|
||||||
|
|
||||||
for connection_id in connections_to_remove {
|
connection_slab.borrow_mut().retain(|_, reference| {
|
||||||
if let Some(_) = connection_slab.borrow_mut().try_remove(connection_id) {
|
let keep = reference.valid_until.0 > now;
|
||||||
::log::debug!("removed connection with id {}", connection_id);
|
|
||||||
} else {
|
if !keep {
|
||||||
::log::error!(
|
if let Some(ref handle) = reference.task_handle {
|
||||||
"couldn't remove connection with id {}, it is not in connection slab",
|
handle.cancel();
|
||||||
connection_id
|
}
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
keep
|
||||||
|
});
|
||||||
|
|
||||||
|
connection_slab.borrow_mut().shrink_to_fit();
|
||||||
|
|
||||||
Some(Duration::from_secs(
|
Some(Duration::from_secs(
|
||||||
config.cleaning.torrent_cleaning_interval,
|
config.cleaning.connection_cleaning_interval,
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -170,6 +179,7 @@ struct Connection {
|
||||||
request_senders: Rc<Senders<ChannelRequest>>,
|
request_senders: Rc<Senders<ChannelRequest>>,
|
||||||
response_receiver: LocalReceiver<ChannelResponse>,
|
response_receiver: LocalReceiver<ChannelResponse>,
|
||||||
response_consumer_id: ConsumerId,
|
response_consumer_id: ConsumerId,
|
||||||
|
connection_slab: Rc<RefCell<Slab<ConnectionReference>>>,
|
||||||
stream: TlsStream<TcpStream>,
|
stream: TlsStream<TcpStream>,
|
||||||
peer_addr: CanonicalSocketAddr,
|
peer_addr: CanonicalSocketAddr,
|
||||||
connection_id: ConnectionId,
|
connection_id: ConnectionId,
|
||||||
|
|
@ -187,6 +197,7 @@ impl Connection {
|
||||||
response_consumer_id: ConsumerId,
|
response_consumer_id: ConsumerId,
|
||||||
connection_id: ConnectionId,
|
connection_id: ConnectionId,
|
||||||
tls_config: Arc<TlsConfig>,
|
tls_config: Arc<TlsConfig>,
|
||||||
|
connection_slab: Rc<RefCell<Slab<ConnectionReference>>>,
|
||||||
stream: TcpStream,
|
stream: TcpStream,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let peer_addr = stream
|
let peer_addr = stream
|
||||||
|
|
@ -207,6 +218,7 @@ impl Connection {
|
||||||
request_senders: request_senders.clone(),
|
request_senders: request_senders.clone(),
|
||||||
response_receiver,
|
response_receiver,
|
||||||
response_consumer_id,
|
response_consumer_id,
|
||||||
|
connection_slab,
|
||||||
stream,
|
stream,
|
||||||
peer_addr,
|
peer_addr,
|
||||||
connection_id,
|
connection_id,
|
||||||
|
|
@ -289,12 +301,19 @@ impl Connection {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Take a request and:
|
/// Take a request and:
|
||||||
|
/// - Update connection ValidUntil
|
||||||
/// - Return error response if request is not allowed
|
/// - Return error response if request is not allowed
|
||||||
/// - If it is an announce request, send it to request workers an await a
|
/// - If it is an announce request, send it to request workers an await a
|
||||||
/// response
|
/// response
|
||||||
/// - If it is a scrape requests, split it up, pass on the parts to
|
/// - If it is a scrape requests, split it up, pass on the parts to
|
||||||
/// relevant request workers and await a response
|
/// relevant request workers and await a response
|
||||||
async fn handle_request(&mut self, request: Request) -> anyhow::Result<Response> {
|
async fn handle_request(&mut self, request: Request) -> anyhow::Result<Response> {
|
||||||
|
if let Ok(mut slab) = self.connection_slab.try_borrow_mut() {
|
||||||
|
if let Some(reference) = slab.get_mut(self.connection_id.0) {
|
||||||
|
reference.valid_until = ValidUntil::new(self.config.cleaning.max_connection_idle);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
match request {
|
match request {
|
||||||
Request::Announce(request) => {
|
Request::Announce(request) => {
|
||||||
let info_hash = request.info_hash;
|
let info_hash = request.info_hash;
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue