aquatic ws load test: send answers, count answer responses

This commit is contained in:
Joakim Frostegård 2020-08-01 05:57:52 +02:00
parent 364606a025
commit b0d2f67ab6
4 changed files with 89 additions and 43 deletions

View file

@ -20,6 +20,7 @@ pub struct Statistics {
pub response_peers: AtomicUsize, pub response_peers: AtomicUsize,
pub responses_announce: AtomicUsize, pub responses_announce: AtomicUsize,
pub responses_offer: AtomicUsize, pub responses_offer: AtomicUsize,
pub responses_answer: AtomicUsize,
pub responses_scrape: AtomicUsize, pub responses_scrape: AtomicUsize,
pub responses_failure: AtomicUsize, pub responses_failure: AtomicUsize,
pub bytes_sent: AtomicUsize, pub bytes_sent: AtomicUsize,

View file

@ -104,6 +104,8 @@ fn monitor_statistics(
.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; .fetch_and(0, Ordering::SeqCst) as f64 / interval_f64;
let responses_offer_per_second = statistics.responses_offer let responses_offer_per_second = statistics.responses_offer
.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; .fetch_and(0, Ordering::SeqCst) as f64 / interval_f64;
let responses_answer_per_second = statistics.responses_answer
.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64;
let responses_scrape_per_second = statistics.responses_scrape let responses_scrape_per_second = statistics.responses_scrape
.fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; .fetch_and(0, Ordering::SeqCst) as f64 / interval_f64;
let responses_failure_per_second = statistics.responses_failure let responses_failure_per_second = statistics.responses_failure
@ -119,6 +121,7 @@ fn monitor_statistics(
let responses_per_second = let responses_per_second =
responses_announce_per_second + responses_announce_per_second +
responses_offer_per_second + responses_offer_per_second +
responses_answer_per_second +
responses_scrape_per_second + responses_scrape_per_second +
responses_failure_per_second; responses_failure_per_second;
@ -129,11 +132,12 @@ fn monitor_statistics(
println!("Responses in: {:.2}/second", responses_per_second); println!("Responses in: {:.2}/second", responses_per_second);
println!(" - Announce responses: {:.2}", responses_announce_per_second); println!(" - Announce responses: {:.2}", responses_announce_per_second);
println!(" - Offer responses: {:.2}", responses_offer_per_second); println!(" - Offer responses: {:.2}", responses_offer_per_second);
println!(" - Answer responses: {:.2}", responses_answer_per_second);
println!(" - Scrape responses: {:.2}", responses_scrape_per_second); println!(" - Scrape responses: {:.2}", responses_scrape_per_second);
println!(" - Failure responses: {:.2}", responses_failure_per_second); // println!(" - Failure responses: {:.2}", responses_failure_per_second);
//println!("Peers per announce response: {:.2}", response_peers / responses_announce); // println!("Peers per announce response: {:.2}", response_peers / responses_announce);
println!("Bandwidth out: {:.2}Mbit/s", bytes_sent_per_second * MBITS_FACTOR); // println!("Bandwidth out: {:.2}Mbit/s", bytes_sent_per_second * MBITS_FACTOR);
println!("Bandwidth in: {:.2}Mbit/s", bytes_received_per_second * MBITS_FACTOR); // println!("Bandwidth in: {:.2}Mbit/s", bytes_received_per_second * MBITS_FACTOR);
let time_elapsed = start_time.elapsed(); let time_elapsed = start_time.elapsed();
let duration = Duration::from_secs(config.duration as u64); let duration = Duration::from_secs(config.duration as u64);

View file

@ -68,7 +68,8 @@ impl ConnectionState {
pub struct Connection { pub struct Connection {
stream: ConnectionState, stream: ConnectionState,
can_send_initial: bool, can_send_initial: bool,
marked_as_complete: bool marked_as_complete: bool,
send_answer: Option<(PeerId, OfferId)>,
} }
@ -89,6 +90,7 @@ impl Connection {
stream: ConnectionState::TcpStream(stream), stream: ConnectionState::TcpStream(stream),
can_send_initial: false, can_send_initial: false,
marked_as_complete: false, marked_as_complete: false,
send_answer: None,
}; };
connections.insert(*token_counter, connection); connections.insert(*token_counter, connection);
@ -104,6 +106,7 @@ impl Connection {
stream, stream,
can_send_initial: self.can_send_initial, can_send_initial: self.can_send_initial,
marked_as_complete: false, marked_as_complete: false,
send_answer: self.send_answer,
}) })
} else { } else {
None None
@ -117,12 +120,51 @@ impl Connection {
rng: &mut impl Rng, rng: &mut impl Rng,
){ ){
if let ConnectionState::WebSocket(ref mut ws) = self.stream { if let ConnectionState::WebSocket(ref mut ws) = self.stream {
let mut send_request = false; let mut send_random_request = false;
loop { loop {
match ws.read_message(){ match ws.read_message(){
Ok(message) => { Ok(message) => {
send_request |= Self::register_response_type(state, message); if let ::tungstenite::Message::Text(text) = message {
if text.contains("answer"){
state.statistics.responses_answer
.fetch_add(1, Ordering::SeqCst);
send_random_request = true;
} else if text.contains("offer"){
// If message is an offer, don't send random
// request in return, since that would cause
// exponential growth of number of requests.
// However, add an answer to next request.
let res_offer: Result<MiddlemanOfferToPeer, _> = ::serde_json::from_str(&text);
match res_offer {
Ok(offer) => {
state.statistics.responses_offer
.fetch_add(1, Ordering::SeqCst);
self.send_answer = Some((
offer.peer_id,
offer.offer_id
));
},
Err(err) => {
eprintln!("error decoding offer: {:?}", err);
}
}
} else if text.contains("interval"){
state.statistics.responses_announce
.fetch_add(1, Ordering::SeqCst);
send_random_request = true;
} else if text.contains("scrape"){
state.statistics.responses_scrape
.fetch_add(1, Ordering::SeqCst);
send_random_request = true;
}
}
}, },
Err(tungstenite::Error::Io(err)) if err.kind() == ErrorKind::WouldBlock => { Err(tungstenite::Error::Io(err)) if err.kind() == ErrorKind::WouldBlock => {
self.can_send_initial = false; self.can_send_initial = false;
@ -137,8 +179,8 @@ impl Connection {
} }
}; };
if send_request { if send_random_request {
self.send_request( self.send_random_request(
config, config,
state, state,
rng, rng,
@ -147,44 +189,43 @@ impl Connection {
} }
} }
fn register_response_type( pub fn send_random_request(
state: &LoadTestState,
message: ::tungstenite::Message,
) -> bool {
if let ::tungstenite::Message::Text(text) = message {
if text.contains("offer"){
state.statistics.responses_offer
.fetch_add(1, Ordering::SeqCst);
return false;
} else if text.contains("announce"){
state.statistics.responses_announce
.fetch_add(1, Ordering::SeqCst);
} else if text.contains("scrape"){
state.statistics.responses_scrape
.fetch_add(1, Ordering::SeqCst);
}
}
true
}
pub fn send_request(
&mut self, &mut self,
config: &Config, config: &Config,
state: &LoadTestState, state: &LoadTestState,
rng: &mut impl Rng, rng: &mut impl Rng,
){
let request = create_random_request(
&config,
&state,
rng
);
// Add offer answer data if applicable
let request = if let InMessage::AnnounceRequest(mut r) = request {
if let Some((peer_id, offer_id)) = self.send_answer {
r.to_peer_id = Some(peer_id);
r.offer_id = Some(offer_id);
r.answer = Some(JsonValue(::serde_json::Value::from("{}")));
}
self.send_answer = None;
InMessage::AnnounceRequest(r)
} else {
request
};
self.send_request(state, request)
}
fn send_request(
&mut self,
state: &LoadTestState,
request: InMessage
){ ){
if let ConnectionState::WebSocket(ref mut ws) = self.stream { if let ConnectionState::WebSocket(ref mut ws) = self.stream {
let request = create_random_request( match ws.write_message(request.to_ws_message()){
&config,
&state,
rng
);
let message = request.to_ws_message();
match ws.write_message(message){
Ok(_) => { Ok(_) => {
state.statistics.requests.fetch_add(1, Ordering::SeqCst); state.statistics.requests.fetch_add(1, Ordering::SeqCst);
}, },
@ -302,7 +343,7 @@ pub fn run_socket_thread(
for (_, connection) in connections.iter_mut(){ for (_, connection) in connections.iter_mut(){
if connection.can_send_initial { if connection.can_send_initial {
connection.send_request( connection.send_random_request(
config, config,
&state, &state,
&mut rng, &mut rng,

View file

@ -65,7 +65,7 @@ impl Default for AnnounceEvent {
/// Apparently, these are sent to a number of peers when they are set /// Apparently, these are sent to a number of peers when they are set
/// in an AnnounceRequest /// in an AnnounceRequest
/// action = "announce" /// action = "announce"
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MiddlemanOfferToPeer { pub struct MiddlemanOfferToPeer {
/// Peer id of peer sending offer /// Peer id of peer sending offer
/// Note: if equal to client peer_id, client ignores offer /// Note: if equal to client peer_id, client ignores offer