http: quit if any worker thread quits

This commit is contained in:
Joakim Frostegård 2024-02-03 22:34:42 +01:00
parent 4ca73630c4
commit d7e06468c3
8 changed files with 151 additions and 145 deletions

View file

@ -10,7 +10,7 @@ use std::time::Duration;
use anyhow::Context;
use aquatic_common::privileges::PrivilegeDropper;
use aquatic_common::rustls_config::RustlsConfig;
use aquatic_common::{CanonicalSocketAddr, PanicSentinel, ServerStartInstant};
use aquatic_common::{CanonicalSocketAddr, ServerStartInstant};
use arc_swap::ArcSwap;
use futures_lite::future::race;
use futures_lite::StreamExt;
@ -32,7 +32,6 @@ struct ConnectionHandle {
#[allow(clippy::too_many_arguments)]
pub async fn run_socket_worker(
_sentinel: PanicSentinel,
config: Config,
state: State,
opt_tls_config: Option<Arc<ArcSwap<RustlsConfig>>>,
@ -40,13 +39,16 @@ pub async fn run_socket_worker(
priv_dropper: PrivilegeDropper,
server_start_instant: ServerStartInstant,
worker_index: usize,
) {
) -> anyhow::Result<()> {
let config = Rc::new(config);
let access_list = state.access_list;
let listener = create_tcp_listener(&config, priv_dropper).expect("create tcp listener");
let listener = create_tcp_listener(&config, priv_dropper).context("create tcp listener")?;
let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap();
let (request_senders, _) = request_mesh_builder
.join(Role::Producer)
.await
.map_err(|err| anyhow::anyhow!("join request mesh: {:#}", err))?;
let request_senders = Rc::new(request_senders);
let connection_handles = Rc::new(RefCell::new(HopSlotMap::with_key()));
@ -145,6 +147,8 @@ pub async fn run_socket_worker(
}
}
}
Ok(())
}
async fn clean_connections(

View file

@ -11,7 +11,7 @@ use glommio::{enclose, prelude::*};
use rand::prelude::SmallRng;
use rand::SeedableRng;
use aquatic_common::{PanicSentinel, ServerStartInstant, ValidUntil};
use aquatic_common::{ServerStartInstant, ValidUntil};
use crate::common::*;
use crate::config::Config;
@ -19,14 +19,16 @@ use crate::config::Config;
use self::storage::TorrentMaps;
pub async fn run_swarm_worker(
_sentinel: PanicSentinel,
config: Config,
state: State,
request_mesh_builder: MeshBuilder<ChannelRequest, Partial>,
server_start_instant: ServerStartInstant,
worker_index: usize,
) {
let (_, mut request_receivers) = request_mesh_builder.join(Role::Consumer).await.unwrap();
) -> anyhow::Result<()> {
let (_, mut request_receivers) = request_mesh_builder
.join(Role::Consumer)
.await
.map_err(|err| anyhow::anyhow!("join request mesh: {:#}", err))?;
let torrents = Rc::new(RefCell::new(TorrentMaps::new(worker_index)));
let access_list = state.access_list;
@ -82,6 +84,8 @@ pub async fn run_swarm_worker(
for handle in handles {
handle.await;
}
Ok(())
}
async fn handle_request_stream<S>(