aquatic_http: use signals for reloading access list

This commit is contained in:
Joakim Frostegård 2021-11-02 23:16:18 +01:00
parent 992f4df635
commit 9a1993d72e
6 changed files with 103 additions and 111 deletions

View file

@ -6,7 +6,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use aquatic_common::access_list::AccessList;
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache};
use aquatic_http_protocol::common::InfoHash;
use aquatic_http_protocol::request::{Request, RequestParseError, ScrapeRequest};
use aquatic_http_protocol::response::{
@ -41,29 +41,16 @@ struct ConnectionReference {
response_sender: LocalSender<ChannelResponse>,
}
struct Connection {
config: Rc<Config>,
access_list: Rc<RefCell<AccessList>>,
request_senders: Rc<Senders<ChannelRequest>>,
response_receiver: LocalReceiver<ChannelResponse>,
response_consumer_id: ConsumerId,
stream: TlsStream<TcpStream>,
peer_addr: SocketAddr,
connection_id: ConnectionId,
request_buffer: [u8; MAX_REQUEST_SIZE],
request_buffer_position: usize,
}
pub async fn run_socket_worker(
config: Config,
state: State,
tls_config: Arc<TlsConfig>,
request_mesh_builder: MeshBuilder<ChannelRequest, Partial>,
response_mesh_builder: MeshBuilder<ChannelResponse, Partial>,
num_bound_sockets: Arc<AtomicUsize>,
access_list: AccessList,
) {
let config = Rc::new(config);
let access_list = Rc::new(RefCell::new(access_list));
let access_list = state.access_list;
let listener = TcpListener::bind(config.network.address).expect("bind socket");
num_bound_sockets.fetch_add(1, Ordering::SeqCst);
@ -77,15 +64,6 @@ pub async fn run_socket_worker(
let connection_slab = Rc::new(RefCell::new(Slab::new()));
let connections_to_remove = Rc::new(RefCell::new(Vec::new()));
// Periodically update access list
TimerActionRepeat::repeat(enclose!((config, access_list) move || {
enclose!((config, access_list) move || async move {
update_access_list(config.clone(), access_list.clone()).await;
Some(Duration::from_secs(config.cleaning.interval))
})()
}));
// Periodically remove closed connections
TimerActionRepeat::repeat(
enclose!((config, connection_slab, connections_to_remove) move || {
@ -177,10 +155,23 @@ async fn receive_responses(
}
}
struct Connection {
config: Rc<Config>,
access_list_cache: AccessListCache,
request_senders: Rc<Senders<ChannelRequest>>,
response_receiver: LocalReceiver<ChannelResponse>,
response_consumer_id: ConsumerId,
stream: TlsStream<TcpStream>,
peer_addr: SocketAddr,
connection_id: ConnectionId,
request_buffer: [u8; MAX_REQUEST_SIZE],
request_buffer_position: usize,
}
impl Connection {
async fn run(
config: Rc<Config>,
access_list: Rc<RefCell<AccessList>>,
access_list: Arc<AccessListArcSwap>,
request_senders: Rc<Senders<ChannelRequest>>,
response_receiver: LocalReceiver<ChannelResponse>,
response_consumer_id: ConsumerId,
@ -197,7 +188,7 @@ impl Connection {
let mut conn = Connection {
config: config.clone(),
access_list: access_list.clone(),
access_list_cache: create_access_list_cache(&access_list),
request_senders: request_senders.clone(),
response_receiver,
response_consumer_id,
@ -297,14 +288,14 @@ impl Connection {
/// response
/// - If it is a scrape requests, split it up, pass on the parts to
/// relevant request workers and await a response
async fn handle_request(&self, request: Request) -> anyhow::Result<Response> {
async fn handle_request(&mut self, request: Request) -> anyhow::Result<Response> {
match request {
Request::Announce(request) => {
let info_hash = request.info_hash;
if self
.access_list
.borrow()
.access_list_cache
.load()
.allows(self.config.access_list.mode, &info_hash.0)
{
let request = ChannelRequest::Announce {