From a852b290caeeef7b29481bb9371a5c497b140662 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 5 Jul 2022 11:15:53 +0200 Subject: [PATCH 1/8] Add cli flag for printing parsed config --- TODO.md | 3 --- aquatic_common/src/cli.rs | 11 ++++++++++- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/TODO.md b/TODO.md index fa3d7e9..9c296d7 100644 --- a/TODO.md +++ b/TODO.md @@ -43,9 +43,6 @@ ## Low priority -* config - * add flag to print parsed config when starting - * aquatic_udp * what poll event capacity is actually needed? * stagger connection cleaning intervals? diff --git a/aquatic_common/src/cli.rs b/aquatic_common/src/cli.rs index 4763092..6c9a2e0 100644 --- a/aquatic_common/src/cli.rs +++ b/aquatic_common/src/cli.rs @@ -26,7 +26,7 @@ impl Default for LogLevel { } } -pub trait Config: Default + TomlConfig + DeserializeOwned { +pub trait Config: Default + TomlConfig + DeserializeOwned + std::fmt::Debug { fn get_log_level(&self) -> Option { None } @@ -36,6 +36,7 @@ pub trait Config: Default + TomlConfig + DeserializeOwned { pub struct Options { config_file: Option, print_config: bool, + print_parsed_config: bool, print_version: bool, } @@ -59,6 +60,9 @@ impl Options { "-p" | "--print-config" => { options.print_config = true; } + "-P" => { + options.print_parsed_config = true; + } "-v" | "--version" => { options.print_version = true; } @@ -148,6 +152,10 @@ where start_logger(log_level)?; } + if options.print_parsed_config { + println!("Running with configuration: {:#?}", config); + } + app_fn(config) } } @@ -162,6 +170,7 @@ where println!(" -c, --config-file Load config from this path"); println!(" -h, --help Print this help message"); println!(" -p, --print-config Print default config"); + println!(" -P Print parsed config"); println!(" -v, --version Print version information"); if let Some(error) = opt_error { From cedf0b61ca27e43a13a39ed1a395893c0035f907 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 5 Jul 2022 11:24:07 +0200 Subject: [PATCH 2/8] Exit with error message on unrecognized config keys --- TODO.md | 2 +- aquatic_common/src/privileges.rs | 2 +- aquatic_http/src/config.rs | 8 ++++---- aquatic_http_load_test/src/config.rs | 4 ++-- aquatic_http_private/src/config.rs | 8 ++++---- aquatic_udp/src/config.rs | 10 +++++----- aquatic_udp_bench/src/config.rs | 1 + aquatic_udp_load_test/src/config.rs | 6 +++--- aquatic_ws/src/config.rs | 8 ++++---- aquatic_ws_load_test/src/config.rs | 4 ++-- 10 files changed, 27 insertions(+), 26 deletions(-) diff --git a/TODO.md b/TODO.md index 9c296d7..a447dd7 100644 --- a/TODO.md +++ b/TODO.md @@ -14,7 +14,7 @@ * Save JoinHandles * When preparing to quit because of PanicSentinel sending SIGTERM, loop through them, extract error and log it -* config: fail on unrecognized keys? + * Run cargo-deny in CI * aquatic_ws diff --git a/aquatic_common/src/privileges.rs b/aquatic_common/src/privileges.rs index 1e18b9f..9cb7db5 100644 --- a/aquatic_common/src/privileges.rs +++ b/aquatic_common/src/privileges.rs @@ -10,7 +10,7 @@ use serde::Deserialize; use aquatic_toml_config::TomlConfig; #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -#[serde(default)] +#[serde(default, deny_unknown_fields)] pub struct PrivilegeConfig { /// Chroot and switch group and user after binding to sockets pub drop_privileges: bool, diff --git a/aquatic_http/src/config.rs b/aquatic_http/src/config.rs index 033ab35..e4a6a35 100644 --- a/aquatic_http/src/config.rs +++ b/aquatic_http/src/config.rs @@ -11,7 +11,7 @@ use aquatic_common::cli::LogLevel; /// aquatic_http configuration #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -#[serde(default)] +#[serde(default, deny_unknown_fields)] pub struct Config { /// Socket workers receive requests from the socket, parse them and send /// them on to the swarm workers. They then receive responses from the @@ -52,7 +52,7 @@ impl aquatic_common::cli::Config for Config { } #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -#[serde(default)] +#[serde(default, deny_unknown_fields)] pub struct NetworkConfig { /// Bind to this address pub address: SocketAddr, @@ -82,7 +82,7 @@ impl Default for NetworkConfig { } #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -#[serde(default)] +#[serde(default, deny_unknown_fields)] pub struct ProtocolConfig { /// Maximum number of torrents to accept in scrape request pub max_scrape_torrents: usize, @@ -103,7 +103,7 @@ impl Default for ProtocolConfig { } #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -#[serde(default)] +#[serde(default, deny_unknown_fields)] pub struct CleaningConfig { /// Clean peers this often (seconds) pub torrent_cleaning_interval: u64, diff --git a/aquatic_http_load_test/src/config.rs b/aquatic_http_load_test/src/config.rs index 5179f75..5c1a35d 100644 --- a/aquatic_http_load_test/src/config.rs +++ b/aquatic_http_load_test/src/config.rs @@ -7,7 +7,7 @@ use serde::Deserialize; /// aquatic_http_load_test configuration #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -#[serde(default)] +#[serde(default, deny_unknown_fields)] pub struct Config { pub server_address: SocketAddr, pub log_level: LogLevel, @@ -34,7 +34,7 @@ impl aquatic_common::cli::Config for Config { } #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -#[serde(default)] +#[serde(default, deny_unknown_fields)] pub struct TorrentConfig { pub number_of_torrents: usize, /// Pareto shape diff --git a/aquatic_http_private/src/config.rs b/aquatic_http_private/src/config.rs index 43e8e2c..d8bd496 100644 --- a/aquatic_http_private/src/config.rs +++ b/aquatic_http_private/src/config.rs @@ -8,7 +8,7 @@ use aquatic_common::cli::LogLevel; /// aquatic_http_private configuration #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -#[serde(default)] +#[serde(default, deny_unknown_fields)] pub struct Config { /// Socket workers receive requests from the socket, parse them and send /// them on to the swarm workers. They then receive responses from the @@ -50,7 +50,7 @@ impl aquatic_common::cli::Config for Config { } #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -#[serde(default)] +#[serde(default, deny_unknown_fields)] pub struct NetworkConfig { /// Bind to this address pub address: SocketAddr, @@ -73,7 +73,7 @@ impl Default for NetworkConfig { } #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -#[serde(default)] +#[serde(default, deny_unknown_fields)] pub struct ProtocolConfig { /// Maximum number of torrents to accept in scrape request pub max_scrape_torrents: usize, @@ -94,7 +94,7 @@ impl Default for ProtocolConfig { } #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -#[serde(default)] +#[serde(default, deny_unknown_fields)] pub struct CleaningConfig { /// Clean peers this often (seconds) pub torrent_cleaning_interval: u64, diff --git a/aquatic_udp/src/config.rs b/aquatic_udp/src/config.rs index 10cddd1..9a7c33d 100644 --- a/aquatic_udp/src/config.rs +++ b/aquatic_udp/src/config.rs @@ -8,7 +8,7 @@ use aquatic_toml_config::TomlConfig; /// aquatic_udp configuration #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -#[serde(default)] +#[serde(default, deny_unknown_fields)] pub struct Config { /// Socket workers receive requests from the socket, parse them and send /// them on to the swarm workers. They then receive responses from the @@ -65,7 +65,7 @@ impl aquatic_common::cli::Config for Config { } #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -#[serde(default)] +#[serde(default, deny_unknown_fields)] pub struct NetworkConfig { /// Bind to this address pub address: SocketAddr, @@ -117,7 +117,7 @@ impl Default for NetworkConfig { } #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -#[serde(default)] +#[serde(default, deny_unknown_fields)] pub struct ProtocolConfig { /// Maximum number of torrents to accept in scrape request pub max_scrape_torrents: u8, @@ -138,7 +138,7 @@ impl Default for ProtocolConfig { } #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -#[serde(default)] +#[serde(default, deny_unknown_fields)] pub struct StatisticsConfig { /// Collect and print/write statistics this often (seconds) pub interval: u64, @@ -168,7 +168,7 @@ impl Default for StatisticsConfig { } #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -#[serde(default)] +#[serde(default, deny_unknown_fields)] pub struct CleaningConfig { /// Clean torrents this often (seconds) pub torrent_cleaning_interval: u64, diff --git a/aquatic_udp_bench/src/config.rs b/aquatic_udp_bench/src/config.rs index a1425d8..20d8ac0 100644 --- a/aquatic_udp_bench/src/config.rs +++ b/aquatic_udp_bench/src/config.rs @@ -2,6 +2,7 @@ use aquatic_toml_config::TomlConfig; use serde::Deserialize; #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] +#[serde(deny_unknown_fields)] pub struct BenchConfig { pub num_rounds: usize, pub num_threads: usize, diff --git a/aquatic_udp_load_test/src/config.rs b/aquatic_udp_load_test/src/config.rs index a189307..b2566df 100644 --- a/aquatic_udp_load_test/src/config.rs +++ b/aquatic_udp_load_test/src/config.rs @@ -9,7 +9,7 @@ use aquatic_toml_config::TomlConfig; /// aquatic_udp_load_test configuration #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -#[serde(default)] +#[serde(default, deny_unknown_fields)] pub struct Config { /// Server address /// @@ -43,7 +43,7 @@ impl Default for Config { } #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -#[serde(default)] +#[serde(default, deny_unknown_fields)] pub struct NetworkConfig { /// True means bind to one localhost IP per socket. /// @@ -86,7 +86,7 @@ impl Default for NetworkConfig { } #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -#[serde(default)] +#[serde(default, deny_unknown_fields)] pub struct RequestConfig { /// Number of torrents to simulate pub number_of_torrents: usize, diff --git a/aquatic_ws/src/config.rs b/aquatic_ws/src/config.rs index 136ab9f..7f051a1 100644 --- a/aquatic_ws/src/config.rs +++ b/aquatic_ws/src/config.rs @@ -10,7 +10,7 @@ use aquatic_toml_config::TomlConfig; /// aquatic_ws configuration #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -#[serde(default)] +#[serde(default, deny_unknown_fields)] pub struct Config { /// Socket workers receive requests from the socket, parse them and send /// them on to the swarm workers. They then receive responses from the @@ -51,7 +51,7 @@ impl aquatic_common::cli::Config for Config { } #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -#[serde(default)] +#[serde(default, deny_unknown_fields)] pub struct NetworkConfig { /// Bind to this address pub address: SocketAddr, @@ -86,7 +86,7 @@ impl Default for NetworkConfig { } #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -#[serde(default)] +#[serde(default, deny_unknown_fields)] pub struct ProtocolConfig { /// Maximum number of torrents to accept in scrape request pub max_scrape_torrents: usize, @@ -107,7 +107,7 @@ impl Default for ProtocolConfig { } #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -#[serde(default)] +#[serde(default, deny_unknown_fields)] pub struct CleaningConfig { /// Clean peers this often (seconds) pub torrent_cleaning_interval: u64, diff --git a/aquatic_ws_load_test/src/config.rs b/aquatic_ws_load_test/src/config.rs index 9af7baf..370769d 100644 --- a/aquatic_ws_load_test/src/config.rs +++ b/aquatic_ws_load_test/src/config.rs @@ -7,7 +7,7 @@ use serde::Deserialize; /// aquatic_ws_load_test configuration #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -#[serde(default)] +#[serde(default, deny_unknown_fields)] pub struct Config { pub server_address: SocketAddr, pub log_level: LogLevel, @@ -41,7 +41,7 @@ impl Default for Config { } #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -#[serde(default)] +#[serde(default, deny_unknown_fields)] pub struct TorrentConfig { pub offers_per_request: usize, pub number_of_torrents: usize, From eba72dac632bad0548fb683c4f3c47a51ba578a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 5 Jul 2022 11:27:36 +0200 Subject: [PATCH 3/8] Run cargo update Updating once_cell v1.12.0 -> v1.13.0 Updating simple_logger v2.1.0 -> v2.2.0 --- Cargo.lock | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 879d688..65e83e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1834,9 +1834,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.12.0" +version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7709cef83f0c1f58f666e746a08b21e0085f7440fa6a29cc194d68aac97a4225" +checksum = "18a6dbe30758c9f83eb00cbea4ac95966305f5a7772f3f42ebfc7fc7eddbd8e1" [[package]] name = "oorandom" @@ -2457,9 +2457,9 @@ checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a" [[package]] name = "simple_logger" -version = "2.1.0" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c75a9723083573ace81ad0cdfc50b858aa3c366c48636edb4109d73122a0c0ea" +checksum = "166fea527c36d9b8a0a88c0c6d4c5077c699d9ffb5cf890b231a3f08c35f3d40" dependencies = [ "atty", "colored", From b06c12e9a5a4586d19c1d8ccd99e8dfafa573765 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 5 Jul 2022 11:38:50 +0200 Subject: [PATCH 4/8] Update TODO --- TODO.md | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/TODO.md b/TODO.md index a447dd7..1647d00 100644 --- a/TODO.md +++ b/TODO.md @@ -2,9 +2,6 @@ ## High priority -* udp: add IP blocklist, which would be more flexible than just adding option - for disallowing requests (claiming to be) from localhost - ## Medium priority * quit whole program if any thread panics @@ -17,6 +14,11 @@ * Run cargo-deny in CI +* udp: add IP blocklist, which would be more flexible than just adding option + for disallowing requests (claiming to be) from localhost + +* stagger cleaning tasks? + * aquatic_ws * remove peer from all torrent maps when connection is closed * store `Vec` in ConnectionReference, containing all used @@ -45,7 +47,6 @@ * aquatic_udp * what poll event capacity is actually needed? - * stagger connection cleaning intervals? * load test * move additional request sending to for each received response, maybe with probability 0.2 From b30da1a930e9dfc942e1efa154e644a64be158f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 5 Jul 2022 12:03:51 +0200 Subject: [PATCH 5/8] ws: store peer_id and announced info hashes in ConnectionReference --- aquatic_ws/src/workers/socket.rs | 64 ++++++++++++++++++++++++++++---- 1 file changed, 56 insertions(+), 8 deletions(-) diff --git a/aquatic_ws/src/workers/socket.rs b/aquatic_ws/src/workers/socket.rs index 42dfb5f..8c04798 100644 --- a/aquatic_ws/src/workers/socket.rs +++ b/aquatic_ws/src/workers/socket.rs @@ -25,7 +25,7 @@ use glommio::net::{TcpListener, TcpStream}; use glommio::task::JoinHandle; use glommio::timer::{sleep, timeout, TimerActionRepeat}; use glommio::{enclose, prelude::*}; -use hashbrown::HashMap; +use hashbrown::{HashMap, HashSet}; use slab::Slab; use crate::config::Config; @@ -46,6 +46,8 @@ struct ConnectionReference { out_message_sender: Rc>, /// Updated after sending message to peer valid_until: ValidUntil, + peer_id: Option, + announced_info_hashes: HashSet, } pub async fn run_socket_worker( @@ -112,6 +114,8 @@ pub async fn run_socket_worker( task_handle: None, out_message_sender: out_message_sender.clone(), valid_until: ValidUntil::new(config.cleaning.max_connection_idle), + peer_id: None, + announced_info_hashes: Default::default(), }); ::log::info!("accepting stream: {}", key); @@ -240,10 +244,11 @@ async fn run_connection( let access_list_cache = create_access_list_cache(&access_list); let reader_handle = spawn_local_into( - enclose!((config, pending_scrape_slab) async move { + enclose!((config, connection_slab, pending_scrape_slab) async move { let mut reader = ConnectionReader { config, access_list_cache, + connection_slab, in_message_senders, out_message_sender, pending_scrape_slab, @@ -289,6 +294,7 @@ async fn run_connection( struct ConnectionReader { config: Rc, access_list_cache: AccessListCache, + connection_slab: Rc>>, in_message_senders: Rc>, out_message_sender: Rc>, pending_scrape_slab: Rc>>, @@ -320,7 +326,7 @@ impl ConnectionReader { Err(err) => { ::log::debug!("Couldn't parse in_message: {:?}", err); - self.send_error_response("Invalid request".into(), None) + self.send_error_response("Invalid request".into(), None, None) .await?; } } @@ -339,6 +345,39 @@ impl ConnectionReader { .load() .allows(self.config.access_list.mode, &info_hash.0) { + { + let mut connection_slab = self.connection_slab.borrow_mut(); + + let connection_reference = connection_slab + .get_mut(self.connection_id.0) + .ok_or_else(|| { + anyhow::anyhow!( + "connection reference {} not found in slab", + self.connection_id.0 + ) + })?; + + match &mut connection_reference.peer_id { + Some(peer_id) if *peer_id != announce_request.peer_id => { + self.send_error_response( + "Only one peer id can be used per connection".into(), + Some(ErrorResponseAction::Announce), + Some(info_hash), + ) + .await?; + return Err(anyhow::anyhow!("Peer used more than one PeerId")); + } + Some(_) => (), + opt_peer_id @ None => { + *opt_peer_id = Some(announce_request.peer_id); + } + } + + connection_reference + .announced_info_hashes + .insert(announce_request.info_hash); + } + let in_message = InMessage::AnnounceRequest(announce_request); let consumer_index = @@ -354,8 +393,12 @@ impl ConnectionReader { .unwrap(); ::log::info!("sent message to swarm worker"); } else { - self.send_error_response("Info hash not allowed".into(), Some(info_hash)) - .await?; + self.send_error_response( + "Info hash not allowed".into(), + Some(ErrorResponseAction::Announce), + Some(info_hash), + ) + .await?; } } InMessage::ScrapeRequest(ScrapeRequest { info_hashes, .. }) => { @@ -364,8 +407,12 @@ impl ConnectionReader { } else { // If request.info_hashes is empty, don't return scrape for all // torrents, even though reference server does it. It is too expensive. - self.send_error_response("Full scrapes are not allowed".into(), None) - .await?; + self.send_error_response( + "Full scrapes are not allowed".into(), + Some(ErrorResponseAction::Scrape), + None, + ) + .await?; return Ok(()); }; @@ -415,10 +462,11 @@ impl ConnectionReader { async fn send_error_response( &self, failure_reason: Cow<'static, str>, + action: Option, info_hash: Option, ) -> anyhow::Result<()> { let out_message = OutMessage::ErrorResponse(ErrorResponse { - action: Some(ErrorResponseAction::Scrape), + action, failure_reason, info_hash, }); From 720ceacf997e32e9de368859d32471bfb2b0b69d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 5 Jul 2022 13:13:53 +0200 Subject: [PATCH 6/8] ws: remove peer from all torrent maps when connection is closed --- TODO.md | 10 ++--- aquatic_ws/src/common.rs | 10 +++++ aquatic_ws/src/lib.rs | 5 +++ aquatic_ws/src/workers/socket.rs | 67 +++++++++++++++++++++++++++----- aquatic_ws/src/workers/swarm.rs | 59 ++++++++++++++++++++++++++++ 5 files changed, 134 insertions(+), 17 deletions(-) diff --git a/TODO.md b/TODO.md index 1647d00..5396993 100644 --- a/TODO.md +++ b/TODO.md @@ -20,13 +20,9 @@ * stagger cleaning tasks? * aquatic_ws - * remove peer from all torrent maps when connection is closed - * store `Vec` in ConnectionReference, containing all used - info hashes. When connection is closed, send - InMessage::ConnectionClosed or similar to request workers. - Storing PeerId in ConnectionReference will also be necessary, as - well as making sure clients only use a single one. Alternatively, - a HashMap> can be used for storage. + * Can peer IP address change after connection has been established + due to some kind of renegotition? It would cause issues. + * Add cleaning task for ConnectionHandle.announced_info_hashes? * RES memory still high after traffic stops, even if torrent maps and connection slabs go down to 0 len and capacity * replacing indexmap_amortized / simd_json with equivalents doesn't help * SinkExt::send maybe doesn't wake up properly? diff --git a/aquatic_ws/src/common.rs b/aquatic_ws/src/common.rs index 006ceb8..27aa134 100644 --- a/aquatic_ws/src/common.rs +++ b/aquatic_ws/src/common.rs @@ -4,6 +4,7 @@ use aquatic_common::access_list::AccessListArcSwap; use aquatic_common::CanonicalSocketAddr; pub use aquatic_common::ValidUntil; +use aquatic_ws_protocol::{InfoHash, PeerId}; #[derive(Default, Clone)] pub struct State { @@ -28,3 +29,12 @@ pub struct ConnectionMeta { pub peer_addr: CanonicalSocketAddr, pub pending_scrape_id: Option, } + +#[derive(Clone, Copy, Debug)] +pub enum SwarmControlMessage { + ConnectionClosed { + info_hash: InfoHash, + peer_id: PeerId, + peer_addr: CanonicalSocketAddr, + }, +} diff --git a/aquatic_ws/src/lib.rs b/aquatic_ws/src/lib.rs index daa77ed..66cc57b 100644 --- a/aquatic_ws/src/lib.rs +++ b/aquatic_ws/src/lib.rs @@ -36,6 +36,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE); let response_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE * 16); + let control_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE); let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel(); let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); @@ -52,6 +53,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let config = config.clone(); let state = state.clone(); let tls_config = tls_config.clone(); + let control_mesh_builder = control_mesh_builder.clone(); let request_mesh_builder = request_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone(); let priv_dropper = priv_dropper.clone(); @@ -71,6 +73,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { config, state, tls_config, + control_mesh_builder, request_mesh_builder, response_mesh_builder, priv_dropper, @@ -86,6 +89,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let sentinel = sentinel.clone(); let config = config.clone(); let state = state.clone(); + let control_mesh_builder = control_mesh_builder.clone(); let request_mesh_builder = request_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone(); @@ -103,6 +107,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { sentinel, config, state, + control_mesh_builder, request_mesh_builder, response_mesh_builder, ) diff --git a/aquatic_ws/src/workers/socket.rs b/aquatic_ws/src/workers/socket.rs index 8c04798..e21a72d 100644 --- a/aquatic_ws/src/workers/socket.rs +++ b/aquatic_ws/src/workers/socket.rs @@ -48,6 +48,7 @@ struct ConnectionReference { valid_until: ValidUntil, peer_id: Option, announced_info_hashes: HashSet, + peer_addr: CanonicalSocketAddr, } pub async fn run_socket_worker( @@ -55,6 +56,7 @@ pub async fn run_socket_worker( config: Config, state: State, tls_config: Arc, + control_message_mesh_builder: MeshBuilder, in_message_mesh_builder: MeshBuilder<(ConnectionMeta, InMessage), Partial>, out_message_mesh_builder: MeshBuilder<(ConnectionMeta, OutMessage), Partial>, priv_dropper: PrivilegeDropper, @@ -64,6 +66,12 @@ pub async fn run_socket_worker( let listener = create_tcp_listener(&config, priv_dropper).expect("create tcp listener"); + let (control_message_senders, _) = control_message_mesh_builder + .join(Role::Producer) + .await + .unwrap(); + let control_message_senders = Rc::new(control_message_senders); + let (in_message_senders, _) = in_message_mesh_builder.join(Role::Producer).await.unwrap(); let in_message_senders = Rc::new(in_message_senders); @@ -107,6 +115,18 @@ pub async fn run_socket_worker( while let Some(stream) = incoming.next().await { match stream { Ok(stream) => { + let peer_addr = match stream.peer_addr() { + Ok(peer_addr) => CanonicalSocketAddr::new(peer_addr), + Err(err) => { + ::log::info!( + "could not extract peer address, closing connection: {:#}", + err + ); + + continue; + } + }; + let (out_message_sender, out_message_receiver) = new_bounded(LOCAL_CHANNEL_SIZE); let out_message_sender = Rc::new(out_message_sender); @@ -116,13 +136,14 @@ pub async fn run_socket_worker( valid_until: ValidUntil::new(config.cleaning.max_connection_idle), peer_id: None, announced_info_hashes: Default::default(), + peer_addr, }); ::log::info!("accepting stream: {}", key); - let task_handle = spawn_local_into(enclose!((config, access_list, in_message_senders, connection_slab, tls_config) async move { + let task_handle = spawn_local_into(enclose!((config, access_list, control_message_senders, in_message_senders, connection_slab, tls_config) async move { if let Err(err) = run_connection( - config, + config.clone(), access_list, in_message_senders, tq_prioritized, @@ -133,12 +154,40 @@ pub async fn run_socket_worker( out_message_consumer_id, ConnectionId(key), tls_config, - stream + stream, + peer_addr, ).await { ::log::debug!("Connection::run() error: {:?}", err); } - connection_slab.borrow_mut().try_remove(key); + // Remove reference in separate statement to avoid + // multiple RefCell borrows + let opt_reference = connection_slab.borrow_mut().try_remove(key); + + // Tell swarm workers to remove peer + if let Some(reference) = opt_reference { + if let Some(peer_id) = reference.peer_id { + for info_hash in reference.announced_info_hashes { + let message = SwarmControlMessage::ConnectionClosed { + info_hash, + peer_id, + peer_addr: reference.peer_addr, + }; + + let consumer_index = + calculate_in_message_consumer_index(&config, info_hash); + + // Only fails when receiver is closed + control_message_senders + .send_to( + consumer_index, + message + ) + .await + .unwrap(); + } + } + } }), tq_regular) .unwrap() .detach(); @@ -148,7 +197,7 @@ pub async fn run_socket_worker( } } Err(err) => { - ::log::error!("accept connection: {:?}", err); + ::log::error!("accept connection: {:#}", err); } } } @@ -221,12 +270,8 @@ async fn run_connection( connection_id: ConnectionId, tls_config: Arc, stream: TcpStream, + peer_addr: CanonicalSocketAddr, ) -> anyhow::Result<()> { - let peer_addr = stream - .peer_addr() - .map_err(|err| anyhow::anyhow!("Couldn't get peer addr: {:?}", err))?; - let peer_addr = CanonicalSocketAddr::new(peer_addr); - let tls_acceptor: TlsAcceptor = tls_config.into(); let stream = tls_acceptor.accept(stream).await?; @@ -357,6 +402,7 @@ impl ConnectionReader { ) })?; + // Store peer id / check if stored peer id matches match &mut connection_reference.peer_id { Some(peer_id) if *peer_id != announce_request.peer_id => { self.send_error_response( @@ -373,6 +419,7 @@ impl ConnectionReader { } } + // Remember info hash for later connection_reference .announced_info_hashes .insert(announce_request.info_hash); diff --git a/aquatic_ws/src/workers/swarm.rs b/aquatic_ws/src/workers/swarm.rs index 87cdd48..0ea07ac 100644 --- a/aquatic_ws/src/workers/swarm.rs +++ b/aquatic_ws/src/workers/swarm.rs @@ -68,6 +68,22 @@ impl Default for TorrentData { } } +impl TorrentData { + pub fn remove_peer(&mut self, peer_id: PeerId) { + if let Some(peer) = self.peers.remove(&peer_id) { + match peer.status { + PeerStatus::Leeching => { + self.num_leechers -= 1; + } + PeerStatus::Seeding => { + self.num_seeders -= 1; + } + PeerStatus::Stopped => (), + } + } + } +} + type TorrentMap = AmortizedIndexMap; #[derive(Default)] @@ -131,9 +147,15 @@ pub async fn run_swarm_worker( _sentinel: PanicSentinel, config: Config, state: State, + control_message_mesh_builder: MeshBuilder, in_message_mesh_builder: MeshBuilder<(ConnectionMeta, InMessage), Partial>, out_message_mesh_builder: MeshBuilder<(ConnectionMeta, OutMessage), Partial>, ) { + let (_, mut control_message_receivers) = control_message_mesh_builder + .join(Role::Consumer) + .await + .unwrap(); + let (_, mut in_message_receivers) = in_message_mesh_builder.join(Role::Consumer).await.unwrap(); let (out_message_senders, _) = out_message_mesh_builder.join(Role::Producer).await.unwrap(); @@ -153,6 +175,13 @@ pub async fn run_swarm_worker( let mut handles = Vec::new(); + for (_, receiver) in control_message_receivers.streams() { + let handle = + spawn_local(handle_control_message_stream(torrents.clone(), receiver)).detach(); + + handles.push(handle); + } + for (_, receiver) in in_message_receivers.streams() { let handle = spawn_local(handle_request_stream( config.clone(), @@ -170,6 +199,36 @@ pub async fn run_swarm_worker( } } +async fn handle_control_message_stream(torrents: Rc>, mut stream: S) +where + S: futures_lite::Stream + ::std::marker::Unpin, +{ + while let Some(message) = stream.next().await { + match message { + SwarmControlMessage::ConnectionClosed { + info_hash, + peer_id, + peer_addr, + } => { + ::log::debug!( + "Removing peer {} from torrents because connection was closed", + peer_addr.get() + ); + + if peer_addr.is_ipv4() { + if let Some(torrent_data) = torrents.borrow_mut().ipv4.get_mut(&info_hash) { + torrent_data.remove_peer(peer_id); + } + } else { + if let Some(torrent_data) = torrents.borrow_mut().ipv6.get_mut(&info_hash) { + torrent_data.remove_peer(peer_id); + } + } + } + } + } +} + async fn handle_request_stream( config: Config, torrents: Rc>, From 91573f47b5b6aadee3ddb2d06d031a009ba43f0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 5 Jul 2022 13:25:33 +0200 Subject: [PATCH 7/8] README: link to explodie tracker stats, change requests/s number --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index dd277a1..20dc68f 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,7 @@ Features at a glance: Known users: -- [explodie.org public tracker](https://explodie.org/opentracker.html) (`udp://explodie.org:6969`), typically serving ~100,000 requests per second +- [explodie.org public tracker](https://explodie.org/opentracker.html) (`udp://explodie.org:6969`), typically [serving ~80,000 requests per second](https://explodie.org/tracker-stats.html) ## Usage From 4cbc04d08728bf7656158645320c912e3f377159 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 5 Jul 2022 13:30:05 +0200 Subject: [PATCH 8/8] Fix CI transfer test aquatic_ws config --- .github/actions/test-transfer/entrypoint.sh | 7 ------- 1 file changed, 7 deletions(-) diff --git a/.github/actions/test-transfer/entrypoint.sh b/.github/actions/test-transfer/entrypoint.sh index 3b82149..1337eca 100755 --- a/.github/actions/test-transfer/entrypoint.sh +++ b/.github/actions/test-transfer/entrypoint.sh @@ -82,15 +82,8 @@ echo "log_level = 'trace' [network] address = '127.0.0.1:3002' - -# glommio tls_certificate_path = './cert.crt' tls_private_key_path = './key.pk8' - -# mio -use_tls = true -tls_pkcs12_path = './identity.pfx' -tls_pkcs12_password = 'p' " > ws.toml ./target/debug/aquatic ws -c ws.toml > "$HOME/wss.log" 2>&1 &