diff --git a/Cargo.lock b/Cargo.lock index f62e009..2cc6eea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -230,6 +230,7 @@ dependencies = [ "rand", "rand_distr", "serde", + "serde_json", "slab", "tungstenite", ] diff --git a/aquatic_ws_load_test/Cargo.toml b/aquatic_ws_load_test/Cargo.toml index c9a936a..fedf02d 100644 --- a/aquatic_ws_load_test/Cargo.toml +++ b/aquatic_ws_load_test/Cargo.toml @@ -18,6 +18,7 @@ mio = { version = "0.7", features = ["udp", "os-poll", "os-util"] } rand = { version = "0.7", features = ["small_rng"] } rand_distr = "0.2" serde = { version = "1", features = ["derive"] } +serde_json = "1" slab = "0.4" tungstenite = "0.11" diff --git a/aquatic_ws_load_test/src/common.rs b/aquatic_ws_load_test/src/common.rs index 8058c15..282a31e 100644 --- a/aquatic_ws_load_test/src/common.rs +++ b/aquatic_ws_load_test/src/common.rs @@ -19,6 +19,7 @@ pub struct Statistics { pub requests: AtomicUsize, pub response_peers: AtomicUsize, pub responses_announce: AtomicUsize, + pub responses_offer: AtomicUsize, pub responses_scrape: AtomicUsize, pub responses_failure: AtomicUsize, pub bytes_sent: AtomicUsize, diff --git a/aquatic_ws_load_test/src/main.rs b/aquatic_ws_load_test/src/main.rs index 9fadbc3..e35be1f 100644 --- a/aquatic_ws_load_test/src/main.rs +++ b/aquatic_ws_load_test/src/main.rs @@ -102,6 +102,8 @@ fn monitor_statistics( let requests_per_second = statistics.requests .fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; + let responses_offer_per_second = statistics.responses_offer + .fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; let responses_scrape_per_second = statistics.responses_scrape .fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; let responses_failure_per_second = statistics.responses_failure @@ -116,6 +118,7 @@ fn monitor_statistics( let responses_per_second = responses_announce_per_second + + responses_offer_per_second + responses_scrape_per_second + responses_failure_per_second; @@ -125,6 +128,7 @@ fn monitor_statistics( println!("Requests out: {:.2}/second", requests_per_second); println!("Responses in: {:.2}/second", responses_per_second); println!(" - Announce responses: {:.2}", responses_announce_per_second); + println!(" - Offer responses: {:.2}", responses_offer_per_second); println!(" - Scrape responses: {:.2}", responses_scrape_per_second); println!(" - Failure responses: {:.2}", responses_failure_per_second); //println!("Peers per announce response: {:.2}", response_peers / responses_announce); diff --git a/aquatic_ws_load_test/src/network.rs b/aquatic_ws_load_test/src/network.rs index 4810428..07fe203 100644 --- a/aquatic_ws_load_test/src/network.rs +++ b/aquatic_ws_load_test/src/network.rs @@ -1,6 +1,6 @@ use std::sync::atomic::Ordering; use std::time::Duration; -use std::io::{Read, Write, ErrorKind, Cursor}; +use std::io::ErrorKind; use hashbrown::HashMap; use mio::{net::TcpStream, Events, Poll, Interest, Token}; @@ -12,9 +12,6 @@ use crate::config::*; use crate::utils::create_random_request; -type HandshakeResult = std::result::Result<(tungstenite::protocol::WebSocket, T), tungstenite::handshake::HandshakeError>>; - - pub enum ConnectionState { TcpStream(TcpStream), WebSocket(WebSocket), @@ -120,41 +117,56 @@ impl Connection { rng: &mut impl Rng, ){ if let ConnectionState::WebSocket(ref mut ws) = self.stream { + let mut send_request = false; + loop { match ws.read_message(){ Ok(message) => { - Self::register_response_type(state, message); - - break; + send_request |= Self::register_response_type(state, message); }, Err(tungstenite::Error::Io(err)) if err.kind() == ErrorKind::WouldBlock => { self.can_send_initial = false; - eprintln!("handle_read_event error would block: {}", err); - - return; + break; }, Err(err) => { eprintln!("handle_read_event error: {}", err); - return; + break; } } }; - self.send_request( - config, - state, - rng, - ); + if send_request { + self.send_request( + config, + state, + rng, + ); + } } } fn register_response_type( state: &LoadTestState, message: ::tungstenite::Message, - ){ - state.statistics.responses_announce.fetch_add(1, Ordering::SeqCst); // FIXME + ) -> bool { + if let ::tungstenite::Message::Text(text) = message { + if text.contains("offer"){ + state.statistics.responses_offer + .fetch_add(1, Ordering::SeqCst); + + return false; + } else if text.contains("announce"){ + state.statistics.responses_announce + .fetch_add(1, Ordering::SeqCst); + } else if text.contains("scrape"){ + state.statistics.responses_scrape + .fetch_add(1, Ordering::SeqCst); + } + } + + true } pub fn send_request( diff --git a/aquatic_ws_load_test/src/utils.rs b/aquatic_ws_load_test/src/utils.rs index 138d6f7..64c889b 100644 --- a/aquatic_ws_load_test/src/utils.rs +++ b/aquatic_ws_load_test/src/utils.rs @@ -57,13 +57,22 @@ fn create_announce_request( let info_hash_index = select_info_hash_index(config, &state, rng); + let mut offers = Vec::with_capacity(10); + + for _ in 0..10 { + offers.push(AnnounceRequestOffer { + offer_id: OfferId(rng.gen()), + offer: JsonValue(::serde_json::Value::from("{}")), + }) + } + InMessage::AnnounceRequest(AnnounceRequest { info_hash: state.info_hashes[info_hash_index], peer_id: PeerId(rng.gen()), bytes_left: Some(bytes_left), event, numwant: None, - offers: None, // FIXME + offers: Some(offers), answer: None, to_peer_id: None, offer_id: None,