aquatic_ws: add ErrorResponse, send it when info hash is not allowed

This commit is contained in:
Joakim Frostegård 2021-10-16 01:18:23 +02:00
parent 28cc6c261a
commit 1d5eb0dff9
5 changed files with 43 additions and 4 deletions

View file

@ -104,10 +104,14 @@ pub fn handle_announce_requests(
.allows(config.access_list.mode, &request.info_hash.0); .allows(config.access_list.mode, &request.info_hash.0);
if !info_hash_allowed { if !info_hash_allowed {
// let response = OutMessage::ErrorResponse(); let response = OutMessage::ErrorResponse(ErrorResponse {
failure_reason: "Info hash not allowed".into(),
action: Some(ErrorResponseAction::Announce),
info_hash: Some(request.info_hash),
});
// out_message_sender.send(request_sender_meta, response); out_message_sender.send(request_sender_meta, response);
// wake_socket_workers[request_sender_meta.worker_index] = true; wake_socket_workers[request_sender_meta.worker_index] = true;
continue; continue;
} }

View file

@ -12,6 +12,7 @@ pub struct Statistics {
pub responses_offer: AtomicUsize, pub responses_offer: AtomicUsize,
pub responses_answer: AtomicUsize, pub responses_answer: AtomicUsize,
pub responses_scrape: AtomicUsize, pub responses_scrape: AtomicUsize,
pub responses_error: AtomicUsize,
} }
#[derive(Clone)] #[derive(Clone)]

View file

@ -85,13 +85,16 @@ fn monitor_statistics(state: LoadTestState, config: &Config) {
statistics.responses_answer.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; statistics.responses_answer.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64;
let responses_scrape_per_second = let responses_scrape_per_second =
statistics.responses_scrape.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; statistics.responses_scrape.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64;
let responses_error_per_second =
statistics.responses_error.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64;
let responses_announce_per_second = responses_announce / interval_f64; let responses_announce_per_second = responses_announce / interval_f64;
let responses_per_second = responses_announce_per_second let responses_per_second = responses_announce_per_second
+ responses_offer_per_second + responses_offer_per_second
+ responses_answer_per_second + responses_answer_per_second
+ responses_scrape_per_second; + responses_scrape_per_second
+ responses_error_per_second;
report_avg_response_vec.push(responses_per_second); report_avg_response_vec.push(responses_per_second);
@ -105,6 +108,7 @@ fn monitor_statistics(state: LoadTestState, config: &Config) {
println!(" - Offer responses: {:.2}", responses_offer_per_second); println!(" - Offer responses: {:.2}", responses_offer_per_second);
println!(" - Answer responses: {:.2}", responses_answer_per_second); println!(" - Answer responses: {:.2}", responses_answer_per_second);
println!(" - Scrape responses: {:.2}", responses_scrape_per_second); println!(" - Scrape responses: {:.2}", responses_scrape_per_second);
println!(" - Error responses: {:.2}", responses_error_per_second);
let time_elapsed = start_time.elapsed(); let time_elapsed = start_time.elapsed();
let duration = Duration::from_secs(config.duration as u64); let duration = Duration::from_secs(config.duration as u64);

View file

@ -151,6 +151,16 @@ impl Connection {
self.can_send = true; self.can_send = true;
} }
Ok(OutMessage::ErrorResponse(response)) => {
state
.statistics
.responses_error
.fetch_add(1, Ordering::SeqCst);
eprintln!("received error response: {:?}", response.failure_reason);
self.can_send = true;
},
Err(err) => { Err(err) => {
eprintln!("error deserializing offer: {:?}", err); eprintln!("error deserializing offer: {:?}", err);
} }

View file

@ -1,3 +1,5 @@
use std::borrow::Cow;
use anyhow::Context; use anyhow::Context;
use hashbrown::HashMap; use hashbrown::HashMap;
use serde::{Deserialize, Deserializer, Serialize, Serializer}; use serde::{Deserialize, Deserializer, Serialize, Serializer};
@ -221,6 +223,23 @@ pub struct ScrapeResponse {
// pub flags: HashMap<String, usize>, // pub flags: HashMap<String, usize>,
} }
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum ErrorResponseAction {
Announce,
Scrape,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ErrorResponse {
pub failure_reason: Cow<'static, str>,
/// Action of original request
#[serde(skip_serializing_if = "Option::is_none")]
pub action: Option<ErrorResponseAction>,
#[serde(skip_serializing_if = "Option::is_none")]
pub info_hash: Option<InfoHash>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(untagged)] #[serde(untagged)]
pub enum InMessage { pub enum InMessage {
@ -255,6 +274,7 @@ pub enum OutMessage {
Answer(MiddlemanAnswerToPeer), Answer(MiddlemanAnswerToPeer),
AnnounceResponse(AnnounceResponse), AnnounceResponse(AnnounceResponse),
ScrapeResponse(ScrapeResponse), ScrapeResponse(ScrapeResponse),
ErrorResponse(ErrorResponse),
} }
impl OutMessage { impl OutMessage {