diff --git a/Cargo.lock b/Cargo.lock index 1c5c1a3..6cdf533 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -33,6 +33,7 @@ dependencies = [ name = "aquatic_udp" version = "0.1.0" dependencies = [ + "anyhow", "aquatic_common", "bittorrent_udp", "cli_helpers", @@ -55,6 +56,7 @@ dependencies = [ name = "aquatic_udp_bench" version = "0.1.0" dependencies = [ + "anyhow", "aquatic_udp", "bittorrent_udp", "cli_helpers", @@ -72,6 +74,7 @@ dependencies = [ name = "aquatic_udp_load_test" version = "0.1.0" dependencies = [ + "anyhow", "bittorrent_udp", "cli_helpers", "crossbeam-channel", @@ -91,6 +94,7 @@ dependencies = [ name = "aquatic_ws" version = "0.1.0" dependencies = [ + "anyhow", "aquatic_common", "cli_helpers", "either", diff --git a/TODO.md b/TODO.md index 68aead1..de95477 100644 --- a/TODO.md +++ b/TODO.md @@ -3,10 +3,7 @@ ## aquatic_ws * ipv4 / ipv6 split state? * network - * handle tls certificate parse errors etc better - * parse once only - * exit with error message, not with panic, use anyhow - * handle socket binding errors better (print them and exit), use anyhow + * panic/error in workers: exit program with non-zero exit code * send/recv buffer size config * limit ws message sizes? * poll: check if event is readable first, otherwise run `continue` diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index 69172d2..b89eaab 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -13,6 +13,7 @@ path = "src/lib/lib.rs" name = "aquatic_udp" [dependencies] +anyhow = "1" aquatic_common = { path = "../aquatic_common" } bittorrent_udp = { path = "../bittorrent_udp" } cli_helpers = { path = "../cli_helpers" } diff --git a/aquatic_udp/src/lib/lib.rs b/aquatic_udp/src/lib/lib.rs index 07f0476..089d31a 100644 --- a/aquatic_udp/src/lib/lib.rs +++ b/aquatic_udp/src/lib/lib.rs @@ -15,7 +15,7 @@ use config::Config; use common::State; -pub fn run(config: Config){ +pub fn run(config: Config) -> ::anyhow::Result<()> { let state = State::new(); let (request_sender, request_receiver) = unbounded(); diff --git a/aquatic_udp_bench/Cargo.toml b/aquatic_udp_bench/Cargo.toml index 4900bdc..03257cb 100644 --- a/aquatic_udp_bench/Cargo.toml +++ b/aquatic_udp_bench/Cargo.toml @@ -15,6 +15,7 @@ name = "aquatic_udp_bench_handlers" name = "plot_pareto" [dependencies] +anyhow = "1" aquatic_udp = { path = "../aquatic_udp" } bittorrent_udp = { path = "../bittorrent_udp" } cli_helpers = { path = "../cli_helpers" } diff --git a/aquatic_udp_bench/src/bin/aquatic_udp_bench_handlers/main.rs b/aquatic_udp_bench/src/bin/aquatic_udp_bench_handlers/main.rs index f81b674..c450134 100644 --- a/aquatic_udp_bench/src/bin/aquatic_udp_bench_handlers/main.rs +++ b/aquatic_udp_bench/src/bin/aquatic_udp_bench_handlers/main.rs @@ -45,7 +45,7 @@ fn main(){ } -pub fn run(bench_config: BenchConfig){ +pub fn run(bench_config: BenchConfig) -> ::anyhow::Result<()> { // Setup common state, spawn request handlers let state = State::new(); @@ -111,6 +111,8 @@ pub fn run(bench_config: BenchConfig){ print_results("Connect: ", c.0, c.1); print_results("Announce:", a.0, a.1); print_results("Scrape: ", s.0, s.1); + + Ok(()) } diff --git a/aquatic_udp_load_test/Cargo.toml b/aquatic_udp_load_test/Cargo.toml index 8c7b62a..da6fca1 100644 --- a/aquatic_udp_load_test/Cargo.toml +++ b/aquatic_udp_load_test/Cargo.toml @@ -9,6 +9,7 @@ license = "Apache-2.0" name = "aquatic_udp_load_test" [dependencies] +anyhow = "1" bittorrent_udp = { path = "../bittorrent_udp" } cli_helpers = { path = "../cli_helpers" } crossbeam-channel = "0.4" diff --git a/aquatic_udp_load_test/src/main.rs b/aquatic_udp_load_test/src/main.rs index 239d543..f8a6127 100644 --- a/aquatic_udp_load_test/src/main.rs +++ b/aquatic_udp_load_test/src/main.rs @@ -32,7 +32,7 @@ pub fn main(){ } -fn run(config: Config){ +fn run(config: Config) -> ::anyhow::Result<()> { if config.handler.weight_announce + config.handler.weight_connect + config.handler.weight_scrape == 0 { panic!("Error: at least one weight must be larger than zero."); } @@ -129,7 +129,9 @@ fn run(config: Config){ monitor_statistics( state, &config - ) + ); + + Ok(()) } diff --git a/aquatic_ws/Cargo.toml b/aquatic_ws/Cargo.toml index 2601720..0a19ce9 100644 --- a/aquatic_ws/Cargo.toml +++ b/aquatic_ws/Cargo.toml @@ -14,6 +14,7 @@ name = "aquatic_ws" path = "src/bin/main.rs" [dependencies] +anyhow = "1" aquatic_common = { path = "../aquatic_common" } cli_helpers = { path = "../cli_helpers" } either = "1" diff --git a/aquatic_ws/src/lib/lib.rs b/aquatic_ws/src/lib/lib.rs index a58185d..510cb4a 100644 --- a/aquatic_ws/src/lib/lib.rs +++ b/aquatic_ws/src/lib/lib.rs @@ -1,4 +1,7 @@ use std::time::Duration; +use std::fs::File; +use std::io::Read; +use native_tls::{Identity, TlsAcceptor}; pub mod common; pub mod config; @@ -11,7 +14,9 @@ use common::*; use config::Config; -pub fn run(config: Config){ +pub fn run(config: Config) -> anyhow::Result<()> { + let opt_tls_acceptor = create_tls_acceptor(&config)?; + let state = State::default(); let (in_message_sender, in_message_receiver) = ::flume::unbounded(); @@ -21,6 +26,7 @@ pub fn run(config: Config){ for i in 0..config.socket_workers { let config = config.clone(); let in_message_sender = in_message_sender.clone(); + let opt_tls_acceptor = opt_tls_acceptor.clone(); let (out_message_sender, out_message_receiver) = ::flume::unbounded(); @@ -32,6 +38,7 @@ pub fn run(config: Config){ i, in_message_sender, out_message_receiver, + opt_tls_acceptor ); }); } @@ -57,4 +64,27 @@ pub fn run(config: Config){ tasks::clean_torrents(&state); } +} + + +pub fn create_tls_acceptor( + config: &Config, +) -> anyhow::Result> { + if config.network.use_tls { + let mut identity_bytes = Vec::new(); + let mut file = File::open(&config.network.tls_pkcs12_path)?; + + file.read_to_end(&mut identity_bytes)?; + + let identity = Identity::from_pkcs12( + &mut identity_bytes, + &config.network.tls_pkcs12_password + )?; + + let acceptor = TlsAcceptor::new(identity)?; + + Ok(Some(acceptor)) + } else { + Ok(None) + } } \ No newline at end of file diff --git a/aquatic_ws/src/lib/network/mod.rs b/aquatic_ws/src/lib/network/mod.rs index 8239fd6..31a21d6 100644 --- a/aquatic_ws/src/lib/network/mod.rs +++ b/aquatic_ws/src/lib/network/mod.rs @@ -23,12 +23,39 @@ pub fn run_socket_worker( socket_worker_index: usize, in_message_sender: InMessageSender, out_message_receiver: OutMessageReceiver, + opt_tls_acceptor: Option, +){ + match create_listener(&config){ + Ok(listener) => { + run_poll_loop( + config, + socket_worker_index, + in_message_sender, + out_message_receiver, + listener, + opt_tls_acceptor + ); + }, + Err(err) => { + eprintln!("Couldn't create TCP listener: {}", err) + } + } +} + + +pub fn run_poll_loop( + config: Config, + socket_worker_index: usize, + in_message_sender: InMessageSender, + out_message_receiver: OutMessageReceiver, + listener: ::std::net::TcpListener, + opt_tls_acceptor: Option, ){ let poll_timeout = Duration::from_millis( config.network.poll_timeout_milliseconds ); - let mut listener = TcpListener::from_std(create_listener(&config)); + let mut listener = TcpListener::from_std(listener); let mut poll = Poll::new().expect("create poll"); let mut events = Events::with_capacity(config.network.poll_event_capacity); @@ -36,12 +63,6 @@ pub fn run_socket_worker( .register(&mut listener, Token(0), Interest::READABLE) .unwrap(); - let opt_tls_acceptor = if config.network.use_tls { - Some(create_tls_acceptor(&config)) - } else { - None - }; - let mut connections: ConnectionMap = HashMap::new(); let mut poll_token_counter = Token(0usize); diff --git a/aquatic_ws/src/lib/network/utils.rs b/aquatic_ws/src/lib/network/utils.rs index 0cbb840..45ca98e 100644 --- a/aquatic_ws/src/lib/network/utils.rs +++ b/aquatic_ws/src/lib/network/utils.rs @@ -1,9 +1,6 @@ -use std::fs::File; -use std::io::Read; use std::time::Instant; use mio::Token; -use native_tls::{Identity, TlsAcceptor}; use net2::{TcpBuilder, unix::UnixTcpBuilderExt}; use crate::config::Config; @@ -11,53 +8,26 @@ use crate::config::Config; use super::connection::*; -pub fn create_listener(config: &Config) -> ::std::net::TcpListener { - let mut builder = &{ - if config.network.address.is_ipv4(){ - TcpBuilder::new_v4().expect("socket: build") - } else { - TcpBuilder::new_v6().expect("socket: build") - } - }; +pub fn create_listener( + config: &Config +) -> ::anyhow::Result<::std::net::TcpListener> { + let builder = if config.network.address.is_ipv4(){ + TcpBuilder::new_v4() + } else { + TcpBuilder::new_v6() + }?; - builder = builder.reuse_port(true) - .expect("socket: set reuse port"); + let builder = builder.reuse_port(true)?; + let builder = builder.bind(&config.network.address)?; - builder = builder.bind(&config.network.address) - .expect(&format!("socket: bind to {}", &config.network.address)); + let listener = builder.listen(128)?; - let listener = builder.listen(128) - .expect("tcpbuilder to tcp listener"); + listener.set_nonblocking(true)?; - listener.set_nonblocking(true) - .expect("socket: set nonblocking"); - - listener + Ok(listener) } -pub fn create_tls_acceptor( - config: &Config, -) -> TlsAcceptor { - let mut identity_bytes = Vec::new(); - let mut file = File::open(&config.network.tls_pkcs12_path) - .expect("open pkcs12 file"); - - file.read_to_end(&mut identity_bytes).expect("read pkcs12 file"); - - let identity = Identity::from_pkcs12( - &mut identity_bytes, - &config.network.tls_pkcs12_password - ).expect("create pkcs12 identity"); - - let acceptor = TlsAcceptor::new(identity) - .expect("create TlsAcceptor"); - - acceptor -} - - - /// Don't bother with deregistering from Poll. In my understanding, this is /// done automatically when the stream is dropped, as long as there are no /// other references to the file descriptor, such as when it is accessed diff --git a/cli_helpers/src/lib.rs b/cli_helpers/src/lib.rs index 020e318..858cd34 100644 --- a/cli_helpers/src/lib.rs +++ b/cli_helpers/src/lib.rs @@ -18,6 +18,49 @@ struct AppOptions { } +pub fn run_app_with_cli_and_config( + title: &str, + // Function that takes config file and runs application + app_fn: fn(T) -> anyhow::Result<()>, +) where T: Default + Serialize + DeserializeOwned { + ::std::process::exit(match run_inner(title, app_fn) { + Ok(()) => 0, + Err(err) => { + print_help(title, Some(err)); + + 1 + }, + }) +} + + +fn run_inner( + title: &str, + // Function that takes config file and runs application + app_fn: fn(T) -> anyhow::Result<()>, +) -> anyhow::Result<()> where T: Default + Serialize + DeserializeOwned { + let args: Vec = ::std::env::args().collect(); + + let opts = AppOptions::parse_args_default(&args[1..])?; + + if opts.help_requested(){ + print_help(title, None); + + Ok(()) + } else if opts.print_config { + print!("{}", default_config_as_toml::()); + + Ok(()) + } else if let Some(config_file) = opts.config_file { + let config = config_from_toml_file(config_file)?; + + app_fn(config) + } else { + app_fn(T::default()) + } +} + + fn config_from_toml_file(path: String) -> anyhow::Result where T: DeserializeOwned { @@ -28,6 +71,7 @@ fn config_from_toml_file(path: String) -> anyhow::Result toml::from_str(&data).map_err(|e| anyhow::anyhow!("Parse failed: {}", e)) } + fn default_config_as_toml() -> String where T: Default + Serialize { @@ -36,40 +80,7 @@ fn default_config_as_toml() -> String } -pub fn run_app_with_cli_and_config( - title: &str, - // Function that takes config file and runs application - app_fn: fn(T), -) where T: Default + Serialize + DeserializeOwned { - let args: Vec = ::std::env::args().collect(); - - match AppOptions::parse_args_default(&args[1..]){ - Ok(opts) => { - if opts.help_requested(){ - print_help(title, None); - } else if opts.print_config { - print!("{}", default_config_as_toml::()); - } else if let Some(config_file) = opts.config_file { - match config_from_toml_file(config_file){ - Ok(config) => app_fn(config), - Err(err) => { - eprintln!("Error while reading config file: {}", err); - - ::std::process::exit(1); - } - } - } else { - app_fn(T::default()) - } - }, - Err(err) => { - print_help(title, Some(&format!("{}", err))) - } - } -} - - -fn print_help(title: &str, opt_error: Option<&str>){ +fn print_help(title: &str, opt_error: Option){ println!("{}", title); if let Some(error) = opt_error {