aquatic_ws: wait for and quit on socket bind errors in workers

This commit is contained in:
Joakim Frostegård 2020-05-23 16:38:31 +02:00
parent 078a8c2868
commit a596ee155a
4 changed files with 46 additions and 3 deletions

View file

@ -3,7 +3,6 @@
## aquatic_ws ## aquatic_ws
* is it even necessary to check if event is readable in poll, since that * is it even necessary to check if event is readable in poll, since that
is all we're listening for? is all we're listening for?
* error in workers: print it, exit program with non-zero exit code
* privdrop * privdrop
* add sensible logging method, maybe stderrlog with quiet as default * add sensible logging method, maybe stderrlog with quiet as default

View file

@ -126,4 +126,8 @@ impl OutMessageSender {
){ ){
self.0[meta.worker_index].send((meta, message)); self.0[meta.worker_index].send((meta, message));
} }
} }
pub type SocketWorkerStatus = Option<Result<(), String>>;
pub type SocketWorkerStatuses = Arc<Mutex<Vec<SocketWorkerStatus>>>;

View file

@ -1,7 +1,9 @@
use std::time::Duration; use std::time::Duration;
use std::fs::File; use std::fs::File;
use std::io::Read; use std::io::Read;
use std::sync::Arc;
use native_tls::{Identity, TlsAcceptor}; use native_tls::{Identity, TlsAcceptor};
use parking_lot::Mutex;
pub mod common; pub mod common;
pub mod config; pub mod config;
@ -23,8 +25,19 @@ pub fn run(config: Config) -> anyhow::Result<()> {
let mut out_message_senders = Vec::new(); let mut out_message_senders = Vec::new();
let socket_worker_statuses: SocketWorkerStatuses = {
let mut statuses = Vec::new();
for _ in 0..config.socket_workers {
statuses.push(None);
}
Arc::new(Mutex::new(statuses))
};
for i in 0..config.socket_workers { for i in 0..config.socket_workers {
let config = config.clone(); let config = config.clone();
let socket_worker_statuses = socket_worker_statuses.clone();
let in_message_sender = in_message_sender.clone(); let in_message_sender = in_message_sender.clone();
let opt_tls_acceptor = opt_tls_acceptor.clone(); let opt_tls_acceptor = opt_tls_acceptor.clone();
@ -36,6 +49,7 @@ pub fn run(config: Config) -> anyhow::Result<()> {
network::run_socket_worker( network::run_socket_worker(
config, config,
i, i,
socket_worker_statuses,
in_message_sender, in_message_sender,
out_message_receiver, out_message_receiver,
opt_tls_acceptor opt_tls_acceptor
@ -43,6 +57,27 @@ pub fn run(config: Config) -> anyhow::Result<()> {
}); });
} }
// Wait for socket worker statuses. On error from any, quit program.
// On success from all, continue program.
loop {
::std::thread::sleep(::std::time::Duration::from_millis(10));
if let Some(statuses) = socket_worker_statuses.try_lock(){
for opt_status in statuses.iter(){
match opt_status {
Some(Err(err)) => {
return Err(::anyhow::anyhow!(err.to_owned()));
},
_ => {},
}
}
if statuses.iter().all(Option::is_some){
break
}
}
}
let out_message_sender = OutMessageSender::new(out_message_senders); let out_message_sender = OutMessageSender::new(out_message_senders);
{ {

View file

@ -22,12 +22,15 @@ use utils::*;
pub fn run_socket_worker( pub fn run_socket_worker(
config: Config, config: Config,
socket_worker_index: usize, socket_worker_index: usize,
socket_worker_statuses: SocketWorkerStatuses,
in_message_sender: InMessageSender, in_message_sender: InMessageSender,
out_message_receiver: OutMessageReceiver, out_message_receiver: OutMessageReceiver,
opt_tls_acceptor: Option<TlsAcceptor>, opt_tls_acceptor: Option<TlsAcceptor>,
){ ){
match create_listener(&config){ match create_listener(&config){
Ok(listener) => { Ok(listener) => {
socket_worker_statuses.lock()[socket_worker_index] = Some(Ok(()));
run_poll_loop( run_poll_loop(
config, config,
socket_worker_index, socket_worker_index,
@ -38,7 +41,9 @@ pub fn run_socket_worker(
); );
}, },
Err(err) => { Err(err) => {
eprintln!("Couldn't create TCP listener: {}", err) socket_worker_statuses.lock()[socket_worker_index] = Some(
Err(format!("Couldn't create TCP listener: {}", err))
);
} }
} }
} }