From 6f2dc6a51031908873fd825334076c41176763f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 1 Aug 2020 22:45:19 +0200 Subject: [PATCH] aquatic_ws: add InMessage::from_ws_message, use in load test --- Cargo.lock | 1 + aquatic_ws_load_test/src/network.rs | 50 ++++++++++++++--------------- aquatic_ws_protocol/Cargo.toml | 1 + aquatic_ws_protocol/src/lib.rs | 33 ++++++++++++++++--- 4 files changed, 56 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a02de5b..8f67588 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -239,6 +239,7 @@ dependencies = [ name = "aquatic_ws_protocol" version = "0.1.0" dependencies = [ + "anyhow", "hashbrown", "serde", "serde_json", diff --git a/aquatic_ws_load_test/src/network.rs b/aquatic_ws_load_test/src/network.rs index 4bc71d2..4854958 100644 --- a/aquatic_ws_load_test/src/network.rs +++ b/aquatic_ws_load_test/src/network.rs @@ -125,39 +125,39 @@ impl Connection { loop { match ws.read_message(){ Ok(message) => { - if let ::tungstenite::Message::Text(text) = message { - if text.contains("answer"){ + match OutMessage::from_ws_message(message){ + Ok(OutMessage::Offer(offer)) => { + state.statistics.responses_offer + .fetch_add(1, Ordering::SeqCst); + + self.send_answer = Some(( + offer.peer_id, + offer.offer_id + )); + + send_random_request = true; + }, + Ok(OutMessage::Answer(_)) => { state.statistics.responses_answer .fetch_add(1, Ordering::SeqCst); - } else if text.contains("offer"){ - // If message is an offer, send an answer but - // no new offers in next announce request. - let res_offer: Result = ::serde_json::from_str(&text); - - match res_offer { - Ok(offer) => { - state.statistics.responses_offer - .fetch_add(1, Ordering::SeqCst); - - self.send_answer = Some(( - offer.peer_id, - offer.offer_id - )); - }, - Err(err) => { - eprintln!("error decoding offer: {:?}", err); - } - } - } else if text.contains("interval"){ + send_random_request = true; + }, + Ok(OutMessage::AnnounceResponse(_)) => { state.statistics.responses_announce .fetch_add(1, Ordering::SeqCst); - } else if text.contains("scrape"){ + + send_random_request = true; + }, + Ok(OutMessage::ScrapeResponse(_)) => { state.statistics.responses_scrape .fetch_add(1, Ordering::SeqCst); - } - send_random_request = true; + send_random_request = true; + }, + Err(err) => { + eprintln!("error deserializing offer: {:?}", err); + } } }, Err(tungstenite::Error::Io(err)) if err.kind() == ErrorKind::WouldBlock => { diff --git a/aquatic_ws_protocol/Cargo.toml b/aquatic_ws_protocol/Cargo.toml index f729d84..f38da1f 100644 --- a/aquatic_ws_protocol/Cargo.toml +++ b/aquatic_ws_protocol/Cargo.toml @@ -9,6 +9,7 @@ license = "Apache-2.0" name = "aquatic_ws_protocol" [dependencies] +anyhow = "1" hashbrown = { version = "0.8", features = ["serde"] } serde = { version = "1", features = ["derive"] } serde_json = "1" diff --git a/aquatic_ws_protocol/src/lib.rs b/aquatic_ws_protocol/src/lib.rs index 082722c..2915647 100644 --- a/aquatic_ws_protocol/src/lib.rs +++ b/aquatic_ws_protocol/src/lib.rs @@ -81,7 +81,7 @@ pub struct MiddlemanOfferToPeer { /// If announce request has answer = true, send this to peer with /// peer id == "to_peer_id" field /// Action field should be 'announce' -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct MiddlemanAnswerToPeer { /// Note: if equal to client peer_id, client ignores answer pub peer_id: PeerId, @@ -134,7 +134,7 @@ pub struct AnnounceRequest { } -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct AnnounceResponse { pub info_hash: InfoHash, /// Client checks if this is null, not clear why @@ -159,7 +159,7 @@ pub struct ScrapeRequest { } -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct ScrapeStatistics { pub complete: usize, pub incomplete: usize, @@ -167,7 +167,7 @@ pub struct ScrapeStatistics { } -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct ScrapeResponse { pub files: HashMap, // Looks like `flags` field is ignored in reference client @@ -297,4 +297,29 @@ impl OutMessage { tungstenite::Message::from(json) } + + #[inline] + pub fn from_ws_message( + message: ::tungstenite::Message + ) -> ::anyhow::Result { + use tungstenite::Message::{Text, Binary}; + + let text = match message { + Text(text) => text, + Binary(bytes) => String::from_utf8(bytes)?, + _ => return Err(anyhow::anyhow!("message type not supported")), + }; + + if text.contains("answer"){ + Ok(Self::Answer(::serde_json::from_str(&text)?)) + } else if text.contains("offer"){ + Ok(Self::Offer(::serde_json::from_str(&text)?)) + } else if text.contains("interval"){ + Ok(Self::AnnounceResponse(::serde_json::from_str(&text)?)) + } else if text.contains("scrape"){ + Ok(Self::ScrapeResponse(::serde_json::from_str(&text)?)) + } else { + Err(anyhow::anyhow!("Could not determine response type")) + } + } } \ No newline at end of file