aquatic_ws: add InMessage::from_ws_message, use in load test

This commit is contained in:
Joakim Frostegård 2020-08-01 22:45:19 +02:00
parent f7eae860d3
commit 6f2dc6a510
4 changed files with 56 additions and 29 deletions

1
Cargo.lock generated
View file

@ -239,6 +239,7 @@ dependencies = [
name = "aquatic_ws_protocol"
version = "0.1.0"
dependencies = [
"anyhow",
"hashbrown",
"serde",
"serde_json",

View file

@ -125,39 +125,39 @@ impl Connection {
loop {
match ws.read_message(){
Ok(message) => {
if let ::tungstenite::Message::Text(text) = message {
if text.contains("answer"){
match OutMessage::from_ws_message(message){
Ok(OutMessage::Offer(offer)) => {
state.statistics.responses_offer
.fetch_add(1, Ordering::SeqCst);
self.send_answer = Some((
offer.peer_id,
offer.offer_id
));
send_random_request = true;
},
Ok(OutMessage::Answer(_)) => {
state.statistics.responses_answer
.fetch_add(1, Ordering::SeqCst);
} else if text.contains("offer"){
// If message is an offer, send an answer but
// no new offers in next announce request.
let res_offer: Result<MiddlemanOfferToPeer, _> = ::serde_json::from_str(&text);
match res_offer {
Ok(offer) => {
state.statistics.responses_offer
.fetch_add(1, Ordering::SeqCst);
self.send_answer = Some((
offer.peer_id,
offer.offer_id
));
},
Err(err) => {
eprintln!("error decoding offer: {:?}", err);
}
}
} else if text.contains("interval"){
send_random_request = true;
},
Ok(OutMessage::AnnounceResponse(_)) => {
state.statistics.responses_announce
.fetch_add(1, Ordering::SeqCst);
} else if text.contains("scrape"){
send_random_request = true;
},
Ok(OutMessage::ScrapeResponse(_)) => {
state.statistics.responses_scrape
.fetch_add(1, Ordering::SeqCst);
}
send_random_request = true;
send_random_request = true;
},
Err(err) => {
eprintln!("error deserializing offer: {:?}", err);
}
}
},
Err(tungstenite::Error::Io(err)) if err.kind() == ErrorKind::WouldBlock => {

View file

@ -9,6 +9,7 @@ license = "Apache-2.0"
name = "aquatic_ws_protocol"
[dependencies]
anyhow = "1"
hashbrown = { version = "0.8", features = ["serde"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"

View file

@ -81,7 +81,7 @@ pub struct MiddlemanOfferToPeer {
/// If announce request has answer = true, send this to peer with
/// peer id == "to_peer_id" field
/// Action field should be 'announce'
#[derive(Debug, Clone, Serialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MiddlemanAnswerToPeer {
/// Note: if equal to client peer_id, client ignores answer
pub peer_id: PeerId,
@ -134,7 +134,7 @@ pub struct AnnounceRequest {
}
#[derive(Debug, Clone, Serialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AnnounceResponse {
pub info_hash: InfoHash,
/// Client checks if this is null, not clear why
@ -159,7 +159,7 @@ pub struct ScrapeRequest {
}
#[derive(Debug, Clone, Serialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScrapeStatistics {
pub complete: usize,
pub incomplete: usize,
@ -167,7 +167,7 @@ pub struct ScrapeStatistics {
}
#[derive(Debug, Clone, Serialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScrapeResponse {
pub files: HashMap<InfoHash, ScrapeStatistics>,
// Looks like `flags` field is ignored in reference client
@ -297,4 +297,29 @@ impl OutMessage {
tungstenite::Message::from(json)
}
#[inline]
pub fn from_ws_message(
message: ::tungstenite::Message
) -> ::anyhow::Result<Self> {
use tungstenite::Message::{Text, Binary};
let text = match message {
Text(text) => text,
Binary(bytes) => String::from_utf8(bytes)?,
_ => return Err(anyhow::anyhow!("message type not supported")),
};
if text.contains("answer"){
Ok(Self::Answer(::serde_json::from_str(&text)?))
} else if text.contains("offer"){
Ok(Self::Offer(::serde_json::from_str(&text)?))
} else if text.contains("interval"){
Ok(Self::AnnounceResponse(::serde_json::from_str(&text)?))
} else if text.contains("scrape"){
Ok(Self::ScrapeResponse(::serde_json::from_str(&text)?))
} else {
Err(anyhow::anyhow!("Could not determine response type"))
}
}
}