From 1d5eb0dff9aeee25b9322e27f837c491b2daf9d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 16 Oct 2021 01:18:23 +0200 Subject: [PATCH] aquatic_ws: add ErrorResponse, send it when info hash is not allowed --- aquatic_ws/src/lib/handler.rs | 10 +++++++--- aquatic_ws_load_test/src/common.rs | 1 + aquatic_ws_load_test/src/main.rs | 6 +++++- aquatic_ws_load_test/src/network.rs | 10 ++++++++++ aquatic_ws_protocol/src/lib.rs | 20 ++++++++++++++++++++ 5 files changed, 43 insertions(+), 4 deletions(-) diff --git a/aquatic_ws/src/lib/handler.rs b/aquatic_ws/src/lib/handler.rs index e140a74..4fe1685 100644 --- a/aquatic_ws/src/lib/handler.rs +++ b/aquatic_ws/src/lib/handler.rs @@ -104,10 +104,14 @@ pub fn handle_announce_requests( .allows(config.access_list.mode, &request.info_hash.0); if !info_hash_allowed { - // let response = OutMessage::ErrorResponse(); + let response = OutMessage::ErrorResponse(ErrorResponse { + failure_reason: "Info hash not allowed".into(), + action: Some(ErrorResponseAction::Announce), + info_hash: Some(request.info_hash), + }); - // out_message_sender.send(request_sender_meta, response); - // wake_socket_workers[request_sender_meta.worker_index] = true; + out_message_sender.send(request_sender_meta, response); + wake_socket_workers[request_sender_meta.worker_index] = true; continue; } diff --git a/aquatic_ws_load_test/src/common.rs b/aquatic_ws_load_test/src/common.rs index 3b36011..200d04f 100644 --- a/aquatic_ws_load_test/src/common.rs +++ b/aquatic_ws_load_test/src/common.rs @@ -12,6 +12,7 @@ pub struct Statistics { pub responses_offer: AtomicUsize, pub responses_answer: AtomicUsize, pub responses_scrape: AtomicUsize, + pub responses_error: AtomicUsize, } #[derive(Clone)] diff --git a/aquatic_ws_load_test/src/main.rs b/aquatic_ws_load_test/src/main.rs index 68b5c89..27c51a8 100644 --- a/aquatic_ws_load_test/src/main.rs +++ b/aquatic_ws_load_test/src/main.rs @@ -85,13 +85,16 @@ fn monitor_statistics(state: LoadTestState, config: &Config) { statistics.responses_answer.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; let responses_scrape_per_second = statistics.responses_scrape.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; + let responses_error_per_second = + statistics.responses_error.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; let responses_announce_per_second = responses_announce / interval_f64; let responses_per_second = responses_announce_per_second + responses_offer_per_second + responses_answer_per_second - + responses_scrape_per_second; + + responses_scrape_per_second + + responses_error_per_second; report_avg_response_vec.push(responses_per_second); @@ -105,6 +108,7 @@ fn monitor_statistics(state: LoadTestState, config: &Config) { println!(" - Offer responses: {:.2}", responses_offer_per_second); println!(" - Answer responses: {:.2}", responses_answer_per_second); println!(" - Scrape responses: {:.2}", responses_scrape_per_second); + println!(" - Error responses: {:.2}", responses_error_per_second); let time_elapsed = start_time.elapsed(); let duration = Duration::from_secs(config.duration as u64); diff --git a/aquatic_ws_load_test/src/network.rs b/aquatic_ws_load_test/src/network.rs index 1611fb5..38c8713 100644 --- a/aquatic_ws_load_test/src/network.rs +++ b/aquatic_ws_load_test/src/network.rs @@ -151,6 +151,16 @@ impl Connection { self.can_send = true; } + Ok(OutMessage::ErrorResponse(response)) => { + state + .statistics + .responses_error + .fetch_add(1, Ordering::SeqCst); + + eprintln!("received error response: {:?}", response.failure_reason); + + self.can_send = true; + }, Err(err) => { eprintln!("error deserializing offer: {:?}", err); } diff --git a/aquatic_ws_protocol/src/lib.rs b/aquatic_ws_protocol/src/lib.rs index 8af7c15..62d6d58 100644 --- a/aquatic_ws_protocol/src/lib.rs +++ b/aquatic_ws_protocol/src/lib.rs @@ -1,3 +1,5 @@ +use std::borrow::Cow; + use anyhow::Context; use hashbrown::HashMap; use serde::{Deserialize, Deserializer, Serialize, Serializer}; @@ -221,6 +223,23 @@ pub struct ScrapeResponse { // pub flags: HashMap, } +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum ErrorResponseAction { + Announce, + Scrape, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ErrorResponse { + pub failure_reason: Cow<'static, str>, + /// Action of original request + #[serde(skip_serializing_if = "Option::is_none")] + pub action: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub info_hash: Option, +} + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(untagged)] pub enum InMessage { @@ -255,6 +274,7 @@ pub enum OutMessage { Answer(MiddlemanAnswerToPeer), AnnounceResponse(AnnounceResponse), ScrapeResponse(ScrapeResponse), + ErrorResponse(ErrorResponse), } impl OutMessage {