cli_helpers: use anyhow in app fn; aquatic_ws: reorganize error handling

This commit is contained in:
Joakim Frostegård 2020-05-23 14:05:50 +02:00
parent 1efe6f96c5
commit 526faa9aab
13 changed files with 134 additions and 93 deletions

4
Cargo.lock generated
View file

@ -33,6 +33,7 @@ dependencies = [
name = "aquatic_udp" name = "aquatic_udp"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow",
"aquatic_common", "aquatic_common",
"bittorrent_udp", "bittorrent_udp",
"cli_helpers", "cli_helpers",
@ -55,6 +56,7 @@ dependencies = [
name = "aquatic_udp_bench" name = "aquatic_udp_bench"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow",
"aquatic_udp", "aquatic_udp",
"bittorrent_udp", "bittorrent_udp",
"cli_helpers", "cli_helpers",
@ -72,6 +74,7 @@ dependencies = [
name = "aquatic_udp_load_test" name = "aquatic_udp_load_test"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow",
"bittorrent_udp", "bittorrent_udp",
"cli_helpers", "cli_helpers",
"crossbeam-channel", "crossbeam-channel",
@ -91,6 +94,7 @@ dependencies = [
name = "aquatic_ws" name = "aquatic_ws"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow",
"aquatic_common", "aquatic_common",
"cli_helpers", "cli_helpers",
"either", "either",

View file

@ -3,10 +3,7 @@
## aquatic_ws ## aquatic_ws
* ipv4 / ipv6 split state? * ipv4 / ipv6 split state?
* network * network
* handle tls certificate parse errors etc better * panic/error in workers: exit program with non-zero exit code
* parse once only
* exit with error message, not with panic, use anyhow
* handle socket binding errors better (print them and exit), use anyhow
* send/recv buffer size config * send/recv buffer size config
* limit ws message sizes? * limit ws message sizes?
* poll: check if event is readable first, otherwise run `continue` * poll: check if event is readable first, otherwise run `continue`

View file

@ -13,6 +13,7 @@ path = "src/lib/lib.rs"
name = "aquatic_udp" name = "aquatic_udp"
[dependencies] [dependencies]
anyhow = "1"
aquatic_common = { path = "../aquatic_common" } aquatic_common = { path = "../aquatic_common" }
bittorrent_udp = { path = "../bittorrent_udp" } bittorrent_udp = { path = "../bittorrent_udp" }
cli_helpers = { path = "../cli_helpers" } cli_helpers = { path = "../cli_helpers" }

View file

@ -15,7 +15,7 @@ use config::Config;
use common::State; use common::State;
pub fn run(config: Config){ pub fn run(config: Config) -> ::anyhow::Result<()> {
let state = State::new(); let state = State::new();
let (request_sender, request_receiver) = unbounded(); let (request_sender, request_receiver) = unbounded();

View file

@ -15,6 +15,7 @@ name = "aquatic_udp_bench_handlers"
name = "plot_pareto" name = "plot_pareto"
[dependencies] [dependencies]
anyhow = "1"
aquatic_udp = { path = "../aquatic_udp" } aquatic_udp = { path = "../aquatic_udp" }
bittorrent_udp = { path = "../bittorrent_udp" } bittorrent_udp = { path = "../bittorrent_udp" }
cli_helpers = { path = "../cli_helpers" } cli_helpers = { path = "../cli_helpers" }

View file

@ -45,7 +45,7 @@ fn main(){
} }
pub fn run(bench_config: BenchConfig){ pub fn run(bench_config: BenchConfig) -> ::anyhow::Result<()> {
// Setup common state, spawn request handlers // Setup common state, spawn request handlers
let state = State::new(); let state = State::new();
@ -111,6 +111,8 @@ pub fn run(bench_config: BenchConfig){
print_results("Connect: ", c.0, c.1); print_results("Connect: ", c.0, c.1);
print_results("Announce:", a.0, a.1); print_results("Announce:", a.0, a.1);
print_results("Scrape: ", s.0, s.1); print_results("Scrape: ", s.0, s.1);
Ok(())
} }

View file

@ -9,6 +9,7 @@ license = "Apache-2.0"
name = "aquatic_udp_load_test" name = "aquatic_udp_load_test"
[dependencies] [dependencies]
anyhow = "1"
bittorrent_udp = { path = "../bittorrent_udp" } bittorrent_udp = { path = "../bittorrent_udp" }
cli_helpers = { path = "../cli_helpers" } cli_helpers = { path = "../cli_helpers" }
crossbeam-channel = "0.4" crossbeam-channel = "0.4"

View file

@ -32,7 +32,7 @@ pub fn main(){
} }
fn run(config: Config){ fn run(config: Config) -> ::anyhow::Result<()> {
if config.handler.weight_announce + config.handler.weight_connect + config.handler.weight_scrape == 0 { if config.handler.weight_announce + config.handler.weight_connect + config.handler.weight_scrape == 0 {
panic!("Error: at least one weight must be larger than zero."); panic!("Error: at least one weight must be larger than zero.");
} }
@ -129,7 +129,9 @@ fn run(config: Config){
monitor_statistics( monitor_statistics(
state, state,
&config &config
) );
Ok(())
} }

View file

@ -14,6 +14,7 @@ name = "aquatic_ws"
path = "src/bin/main.rs" path = "src/bin/main.rs"
[dependencies] [dependencies]
anyhow = "1"
aquatic_common = { path = "../aquatic_common" } aquatic_common = { path = "../aquatic_common" }
cli_helpers = { path = "../cli_helpers" } cli_helpers = { path = "../cli_helpers" }
either = "1" either = "1"

View file

@ -1,4 +1,7 @@
use std::time::Duration; use std::time::Duration;
use std::fs::File;
use std::io::Read;
use native_tls::{Identity, TlsAcceptor};
pub mod common; pub mod common;
pub mod config; pub mod config;
@ -11,7 +14,9 @@ use common::*;
use config::Config; use config::Config;
pub fn run(config: Config){ pub fn run(config: Config) -> anyhow::Result<()> {
let opt_tls_acceptor = create_tls_acceptor(&config)?;
let state = State::default(); let state = State::default();
let (in_message_sender, in_message_receiver) = ::flume::unbounded(); let (in_message_sender, in_message_receiver) = ::flume::unbounded();
@ -21,6 +26,7 @@ pub fn run(config: Config){
for i in 0..config.socket_workers { for i in 0..config.socket_workers {
let config = config.clone(); let config = config.clone();
let in_message_sender = in_message_sender.clone(); let in_message_sender = in_message_sender.clone();
let opt_tls_acceptor = opt_tls_acceptor.clone();
let (out_message_sender, out_message_receiver) = ::flume::unbounded(); let (out_message_sender, out_message_receiver) = ::flume::unbounded();
@ -32,6 +38,7 @@ pub fn run(config: Config){
i, i,
in_message_sender, in_message_sender,
out_message_receiver, out_message_receiver,
opt_tls_acceptor
); );
}); });
} }
@ -57,4 +64,27 @@ pub fn run(config: Config){
tasks::clean_torrents(&state); tasks::clean_torrents(&state);
} }
}
pub fn create_tls_acceptor(
config: &Config,
) -> anyhow::Result<Option<TlsAcceptor>> {
if config.network.use_tls {
let mut identity_bytes = Vec::new();
let mut file = File::open(&config.network.tls_pkcs12_path)?;
file.read_to_end(&mut identity_bytes)?;
let identity = Identity::from_pkcs12(
&mut identity_bytes,
&config.network.tls_pkcs12_password
)?;
let acceptor = TlsAcceptor::new(identity)?;
Ok(Some(acceptor))
} else {
Ok(None)
}
} }

View file

@ -23,12 +23,39 @@ pub fn run_socket_worker(
socket_worker_index: usize, socket_worker_index: usize,
in_message_sender: InMessageSender, in_message_sender: InMessageSender,
out_message_receiver: OutMessageReceiver, out_message_receiver: OutMessageReceiver,
opt_tls_acceptor: Option<TlsAcceptor>,
){
match create_listener(&config){
Ok(listener) => {
run_poll_loop(
config,
socket_worker_index,
in_message_sender,
out_message_receiver,
listener,
opt_tls_acceptor
);
},
Err(err) => {
eprintln!("Couldn't create TCP listener: {}", err)
}
}
}
pub fn run_poll_loop(
config: Config,
socket_worker_index: usize,
in_message_sender: InMessageSender,
out_message_receiver: OutMessageReceiver,
listener: ::std::net::TcpListener,
opt_tls_acceptor: Option<TlsAcceptor>,
){ ){
let poll_timeout = Duration::from_millis( let poll_timeout = Duration::from_millis(
config.network.poll_timeout_milliseconds config.network.poll_timeout_milliseconds
); );
let mut listener = TcpListener::from_std(create_listener(&config)); let mut listener = TcpListener::from_std(listener);
let mut poll = Poll::new().expect("create poll"); let mut poll = Poll::new().expect("create poll");
let mut events = Events::with_capacity(config.network.poll_event_capacity); let mut events = Events::with_capacity(config.network.poll_event_capacity);
@ -36,12 +63,6 @@ pub fn run_socket_worker(
.register(&mut listener, Token(0), Interest::READABLE) .register(&mut listener, Token(0), Interest::READABLE)
.unwrap(); .unwrap();
let opt_tls_acceptor = if config.network.use_tls {
Some(create_tls_acceptor(&config))
} else {
None
};
let mut connections: ConnectionMap = HashMap::new(); let mut connections: ConnectionMap = HashMap::new();
let mut poll_token_counter = Token(0usize); let mut poll_token_counter = Token(0usize);

View file

@ -1,9 +1,6 @@
use std::fs::File;
use std::io::Read;
use std::time::Instant; use std::time::Instant;
use mio::Token; use mio::Token;
use native_tls::{Identity, TlsAcceptor};
use net2::{TcpBuilder, unix::UnixTcpBuilderExt}; use net2::{TcpBuilder, unix::UnixTcpBuilderExt};
use crate::config::Config; use crate::config::Config;
@ -11,53 +8,26 @@ use crate::config::Config;
use super::connection::*; use super::connection::*;
pub fn create_listener(config: &Config) -> ::std::net::TcpListener { pub fn create_listener(
let mut builder = &{ config: &Config
if config.network.address.is_ipv4(){ ) -> ::anyhow::Result<::std::net::TcpListener> {
TcpBuilder::new_v4().expect("socket: build") let builder = if config.network.address.is_ipv4(){
} else { TcpBuilder::new_v4()
TcpBuilder::new_v6().expect("socket: build") } else {
} TcpBuilder::new_v6()
}; }?;
builder = builder.reuse_port(true) let builder = builder.reuse_port(true)?;
.expect("socket: set reuse port"); let builder = builder.bind(&config.network.address)?;
builder = builder.bind(&config.network.address) let listener = builder.listen(128)?;
.expect(&format!("socket: bind to {}", &config.network.address));
let listener = builder.listen(128) listener.set_nonblocking(true)?;
.expect("tcpbuilder to tcp listener");
listener.set_nonblocking(true) Ok(listener)
.expect("socket: set nonblocking");
listener
} }
pub fn create_tls_acceptor(
config: &Config,
) -> TlsAcceptor {
let mut identity_bytes = Vec::new();
let mut file = File::open(&config.network.tls_pkcs12_path)
.expect("open pkcs12 file");
file.read_to_end(&mut identity_bytes).expect("read pkcs12 file");
let identity = Identity::from_pkcs12(
&mut identity_bytes,
&config.network.tls_pkcs12_password
).expect("create pkcs12 identity");
let acceptor = TlsAcceptor::new(identity)
.expect("create TlsAcceptor");
acceptor
}
/// Don't bother with deregistering from Poll. In my understanding, this is /// Don't bother with deregistering from Poll. In my understanding, this is
/// done automatically when the stream is dropped, as long as there are no /// done automatically when the stream is dropped, as long as there are no
/// other references to the file descriptor, such as when it is accessed /// other references to the file descriptor, such as when it is accessed

View file

@ -18,6 +18,49 @@ struct AppOptions {
} }
pub fn run_app_with_cli_and_config<T>(
title: &str,
// Function that takes config file and runs application
app_fn: fn(T) -> anyhow::Result<()>,
) where T: Default + Serialize + DeserializeOwned {
::std::process::exit(match run_inner(title, app_fn) {
Ok(()) => 0,
Err(err) => {
print_help(title, Some(err));
1
},
})
}
fn run_inner<T>(
title: &str,
// Function that takes config file and runs application
app_fn: fn(T) -> anyhow::Result<()>,
) -> anyhow::Result<()> where T: Default + Serialize + DeserializeOwned {
let args: Vec<String> = ::std::env::args().collect();
let opts = AppOptions::parse_args_default(&args[1..])?;
if opts.help_requested(){
print_help(title, None);
Ok(())
} else if opts.print_config {
print!("{}", default_config_as_toml::<T>());
Ok(())
} else if let Some(config_file) = opts.config_file {
let config = config_from_toml_file(config_file)?;
app_fn(config)
} else {
app_fn(T::default())
}
}
fn config_from_toml_file<T>(path: String) -> anyhow::Result<T> fn config_from_toml_file<T>(path: String) -> anyhow::Result<T>
where T: DeserializeOwned where T: DeserializeOwned
{ {
@ -28,6 +71,7 @@ fn config_from_toml_file<T>(path: String) -> anyhow::Result<T>
toml::from_str(&data).map_err(|e| anyhow::anyhow!("Parse failed: {}", e)) toml::from_str(&data).map_err(|e| anyhow::anyhow!("Parse failed: {}", e))
} }
fn default_config_as_toml<T>() -> String fn default_config_as_toml<T>() -> String
where T: Default + Serialize where T: Default + Serialize
{ {
@ -36,40 +80,7 @@ fn default_config_as_toml<T>() -> String
} }
pub fn run_app_with_cli_and_config<T>( fn print_help(title: &str, opt_error: Option<anyhow::Error>){
title: &str,
// Function that takes config file and runs application
app_fn: fn(T),
) where T: Default + Serialize + DeserializeOwned {
let args: Vec<String> = ::std::env::args().collect();
match AppOptions::parse_args_default(&args[1..]){
Ok(opts) => {
if opts.help_requested(){
print_help(title, None);
} else if opts.print_config {
print!("{}", default_config_as_toml::<T>());
} else if let Some(config_file) = opts.config_file {
match config_from_toml_file(config_file){
Ok(config) => app_fn(config),
Err(err) => {
eprintln!("Error while reading config file: {}", err);
::std::process::exit(1);
}
}
} else {
app_fn(T::default())
}
},
Err(err) => {
print_help(title, Some(&format!("{}", err)))
}
}
}
fn print_help(title: &str, opt_error: Option<&str>){
println!("{}", title); println!("{}", title);
if let Some(error) = opt_error { if let Some(error) = opt_error {