diff --git a/Cargo.lock b/Cargo.lock index dfd95c5..f62e009 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -195,6 +195,7 @@ dependencies = [ "anyhow", "aquatic_cli_helpers", "aquatic_common", + "aquatic_ws_protocol", "crossbeam-channel", "either", "hashbrown", @@ -209,12 +210,40 @@ dependencies = [ "quickcheck_macros", "rand", "serde", - "serde_json", "simplelog", "socket2", "tungstenite", ] +[[package]] +name = "aquatic_ws_load_test" +version = "0.1.0" +dependencies = [ + "anyhow", + "aquatic_cli_helpers", + "aquatic_ws_protocol", + "hashbrown", + "mimalloc", + "mio", + "quickcheck", + "quickcheck_macros", + "rand", + "rand_distr", + "serde", + "slab", + "tungstenite", +] + +[[package]] +name = "aquatic_ws_protocol" +version = "0.1.0" +dependencies = [ + "hashbrown", + "serde", + "serde_json", + "tungstenite", +] + [[package]] name = "arrayvec" version = "0.4.12" diff --git a/Cargo.toml b/Cargo.toml index 3fdc42e..5011614 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,8 @@ members = [ "aquatic_udp_load_test", "aquatic_udp_protocol", "aquatic_ws", + "aquatic_ws_load_test", + "aquatic_ws_protocol", "plot_pareto" ] diff --git a/aquatic_ws/Cargo.toml b/aquatic_ws/Cargo.toml index 8abf501..f4c3a40 100644 --- a/aquatic_ws/Cargo.toml +++ b/aquatic_ws/Cargo.toml @@ -17,6 +17,7 @@ path = "src/bin/main.rs" anyhow = "1" aquatic_cli_helpers = { path = "../aquatic_cli_helpers" } aquatic_common = { path = "../aquatic_common" } +aquatic_ws_protocol = { path = "../aquatic_ws_protocol" } crossbeam-channel = "0.4" either = "1" hashbrown = { version = "0.8", features = ["serde"] } @@ -29,7 +30,6 @@ parking_lot = "0.11" privdrop = "0.3" rand = { version = "0.7", features = ["small_rng"] } serde = { version = "1", features = ["derive"] } -serde_json = "1" socket2 = { version = "0.3", features = ["reuseport"] } simplelog = "0.8" tungstenite = "0.11" diff --git a/aquatic_ws/src/lib/common.rs b/aquatic_ws/src/lib/common.rs index 949ceaf..0cd1a07 100644 --- a/aquatic_ws/src/lib/common.rs +++ b/aquatic_ws/src/lib/common.rs @@ -10,7 +10,7 @@ use parking_lot::Mutex; pub use aquatic_common::ValidUntil; -use crate::protocol::*; +use aquatic_ws_protocol::*; #[derive(Clone, Copy, Debug)] diff --git a/aquatic_ws/src/lib/handler.rs b/aquatic_ws/src/lib/handler.rs index 0b6b68e..fc9fe20 100644 --- a/aquatic_ws/src/lib/handler.rs +++ b/aquatic_ws/src/lib/handler.rs @@ -6,10 +6,10 @@ use parking_lot::MutexGuard; use rand::{Rng, SeedableRng, rngs::SmallRng}; use aquatic_common::extract_response_peers; +use aquatic_ws_protocol::*; use crate::common::*; use crate::config::Config; -use crate::protocol::*; pub fn run_request_worker( diff --git a/aquatic_ws/src/lib/lib.rs b/aquatic_ws/src/lib/lib.rs index e21a77d..28d1b39 100644 --- a/aquatic_ws/src/lib/lib.rs +++ b/aquatic_ws/src/lib/lib.rs @@ -13,7 +13,6 @@ pub mod common; pub mod config; pub mod handler; pub mod network; -pub mod protocol; pub mod tasks; use common::*; diff --git a/aquatic_ws/src/lib/network/mod.rs b/aquatic_ws/src/lib/network/mod.rs index 7e2da6c..32562b9 100644 --- a/aquatic_ws/src/lib/network/mod.rs +++ b/aquatic_ws/src/lib/network/mod.rs @@ -10,10 +10,10 @@ use mio::net::TcpListener; use tungstenite::protocol::WebSocketConfig; use aquatic_common::convert_ipv4_mapped_ipv6; +use aquatic_ws_protocol::*; use crate::common::*; use crate::config::Config; -use crate::protocol::*; pub mod connection; pub mod utils; @@ -242,6 +242,8 @@ pub fn run_handshakes_and_read_messages( if stop_loop { break; } + } else { + break } } } diff --git a/aquatic_ws_load_test/Cargo.toml b/aquatic_ws_load_test/Cargo.toml new file mode 100644 index 0000000..c9a936a --- /dev/null +++ b/aquatic_ws_load_test/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "aquatic_ws_load_test" +version = "0.1.0" +authors = ["Joakim FrostegÄrd "] +edition = "2018" +license = "Apache-2.0" + +[[bin]] +name = "aquatic_ws_load_test" + +[dependencies] +anyhow = "1" +aquatic_cli_helpers = { path = "../aquatic_cli_helpers" } +aquatic_ws_protocol = { path = "../aquatic_ws_protocol" } +hashbrown = { version = "0.8", features = ["serde"] } +mimalloc = { version = "0.1", default-features = false } +mio = { version = "0.7", features = ["udp", "os-poll", "os-util"] } +rand = { version = "0.7", features = ["small_rng"] } +rand_distr = "0.2" +serde = { version = "1", features = ["derive"] } +slab = "0.4" +tungstenite = "0.11" + +[dev-dependencies] +quickcheck = "0.9" +quickcheck_macros = "0.9" diff --git a/aquatic_ws_load_test/src/common.rs b/aquatic_ws_load_test/src/common.rs new file mode 100644 index 0000000..8058c15 --- /dev/null +++ b/aquatic_ws_load_test/src/common.rs @@ -0,0 +1,41 @@ +use std::sync::{Arc, atomic::AtomicUsize}; + +use rand_distr::Pareto; + +pub use aquatic_ws_protocol::*; + + +#[derive(PartialEq, Eq, Clone)] +pub struct TorrentPeer { + pub info_hash: InfoHash, + pub scrape_hash_indeces: Vec, + pub peer_id: PeerId, + pub port: u16, +} + + +#[derive(Default)] +pub struct Statistics { + pub requests: AtomicUsize, + pub response_peers: AtomicUsize, + pub responses_announce: AtomicUsize, + pub responses_scrape: AtomicUsize, + pub responses_failure: AtomicUsize, + pub bytes_sent: AtomicUsize, + pub bytes_received: AtomicUsize, +} + + +#[derive(Clone)] +pub struct LoadTestState { + pub info_hashes: Arc>, + pub statistics: Arc, + pub pareto: Arc>, +} + + +#[derive(PartialEq, Eq, Clone, Copy)] +pub enum RequestType { + Announce, + Scrape +} \ No newline at end of file diff --git a/aquatic_ws_load_test/src/config.rs b/aquatic_ws_load_test/src/config.rs new file mode 100644 index 0000000..84b92d1 --- /dev/null +++ b/aquatic_ws_load_test/src/config.rs @@ -0,0 +1,76 @@ +use std::net::SocketAddr; + +use serde::{Serialize, Deserialize}; + + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(default)] +pub struct Config { + pub server_address: SocketAddr, + pub num_workers: u8, + pub duration: usize, + pub network: NetworkConfig, + pub torrents: TorrentConfig, +} + + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(default)] +pub struct NetworkConfig { + pub poll_timeout_microseconds: u64, + pub poll_event_capacity: usize, +} + + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(default)] +pub struct TorrentConfig { + pub number_of_torrents: usize, + /// Pareto shape + /// + /// Fake peers choose torrents according to Pareto distribution. + pub torrent_selection_pareto_shape: f64, + /// Probability that a generated peer is a seeder + pub peer_seeder_probability: f64, + /// Probability that a generated request is a announce request, as part + /// of sum of the various weight arguments. + pub weight_announce: usize, + /// Probability that a generated request is a scrape request, as part + /// of sum of the various weight arguments. + pub weight_scrape: usize, +} + + +impl Default for Config { + fn default() -> Self { + Self { + server_address: "127.0.0.1:3000".parse().unwrap(), + num_workers: 1, + duration: 0, + network: NetworkConfig::default(), + torrents: TorrentConfig::default(), + } + } +} + +impl Default for NetworkConfig { + fn default() -> Self { + Self { + poll_timeout_microseconds: 1000, + poll_event_capacity: 4096, + } + } +} + + +impl Default for TorrentConfig { + fn default() -> Self { + Self { + number_of_torrents: 10_000, + peer_seeder_probability: 0.25, + torrent_selection_pareto_shape: 2.0, + weight_announce: 5, + weight_scrape: 0, + } + } +} diff --git a/aquatic_ws_load_test/src/main.rs b/aquatic_ws_load_test/src/main.rs new file mode 100644 index 0000000..9fadbc3 --- /dev/null +++ b/aquatic_ws_load_test/src/main.rs @@ -0,0 +1,156 @@ +use std::thread; +use std::sync::{Arc, atomic::Ordering}; +use std::time::{Duration, Instant}; + +use rand::prelude::*; +use rand_distr::Pareto; + +mod common; +mod config; +mod network; +mod utils; + +use common::*; +use config::*; +use network::*; + + +#[global_allocator] +static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; + + +/// Multiply bytes during a second with this to get Mbit/s +const MBITS_FACTOR: f64 = 1.0 / ((1024.0 * 1024.0) / 8.0); + + +pub fn main(){ + aquatic_cli_helpers::run_app_with_cli_and_config::( + "aquatic: udp bittorrent tracker: load tester", + run, + ) +} + + +fn run(config: Config) -> ::anyhow::Result<()> { + if config.torrents.weight_announce + config.torrents.weight_scrape == 0 { + panic!("Error: at least one weight must be larger than zero."); + } + + println!("Starting client with config: {:#?}", config); + + let mut info_hashes = Vec::with_capacity(config.torrents.number_of_torrents); + + let mut rng = SmallRng::from_entropy(); + + for _ in 0..config.torrents.number_of_torrents { + info_hashes.push(InfoHash(rng.gen())); + } + + let pareto = Pareto::new( + 1.0, + config.torrents.torrent_selection_pareto_shape + ).unwrap(); + + let state = LoadTestState { + info_hashes: Arc::new(info_hashes), + statistics: Arc::new(Statistics::default()), + pareto: Arc::new(pareto), + }; + + // Start socket workers + + for _ in 0..config.num_workers { + + let config = config.clone(); + let state = state.clone(); + + thread::spawn(move || run_socket_thread( + &config, + state, + 1 + )); + } + + monitor_statistics( + state, + &config + ); + + Ok(()) +} + + +fn monitor_statistics( + state: LoadTestState, + config: &Config, +){ + let start_time = Instant::now(); + let mut report_avg_response_vec: Vec = Vec::new(); + + let interval = 5; + let interval_f64 = interval as f64; + + loop { + thread::sleep(Duration::from_secs(interval)); + + let statistics = state.statistics.as_ref(); + + let responses_announce = statistics.responses_announce + .fetch_and(0, Ordering::SeqCst) as f64; + // let response_peers = statistics.response_peers + // .fetch_and(0, Ordering::SeqCst) as f64; + + let requests_per_second = statistics.requests + .fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; + let responses_scrape_per_second = statistics.responses_scrape + .fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; + let responses_failure_per_second = statistics.responses_failure + .fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; + + let bytes_sent_per_second = statistics.bytes_sent + .fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; + let bytes_received_per_second = statistics.bytes_received + .fetch_and(0, Ordering::SeqCst) as f64 / interval_f64; + + let responses_announce_per_second = responses_announce / interval_f64; + + let responses_per_second = + responses_announce_per_second + + responses_scrape_per_second + + responses_failure_per_second; + + report_avg_response_vec.push(responses_per_second); + + println!(); + println!("Requests out: {:.2}/second", requests_per_second); + println!("Responses in: {:.2}/second", responses_per_second); + println!(" - Announce responses: {:.2}", responses_announce_per_second); + println!(" - Scrape responses: {:.2}", responses_scrape_per_second); + println!(" - Failure responses: {:.2}", responses_failure_per_second); + //println!("Peers per announce response: {:.2}", response_peers / responses_announce); + println!("Bandwidth out: {:.2}Mbit/s", bytes_sent_per_second * MBITS_FACTOR); + println!("Bandwidth in: {:.2}Mbit/s", bytes_received_per_second * MBITS_FACTOR); + + let time_elapsed = start_time.elapsed(); + let duration = Duration::from_secs(config.duration as u64); + + if config.duration != 0 && time_elapsed >= duration { + let report_len = report_avg_response_vec.len() as f64; + let report_sum: f64 = report_avg_response_vec.into_iter().sum(); + let report_avg: f64 = report_sum / report_len; + + println!( + concat!( + "\n# aquatic load test report\n\n", + "Test ran for {} seconds.\n", + "Average responses per second: {:.2}\n\nConfig: {:#?}\n" + ), + time_elapsed.as_secs(), + report_avg, + config + ); + + break + } + } +} diff --git a/aquatic_ws_load_test/src/network.rs b/aquatic_ws_load_test/src/network.rs new file mode 100644 index 0000000..4810428 --- /dev/null +++ b/aquatic_ws_load_test/src/network.rs @@ -0,0 +1,323 @@ +use std::sync::atomic::Ordering; +use std::time::Duration; +use std::io::{Read, Write, ErrorKind, Cursor}; + +use hashbrown::HashMap; +use mio::{net::TcpStream, Events, Poll, Interest, Token}; +use rand::{rngs::SmallRng, prelude::*}; +use tungstenite::{WebSocket, HandshakeError, ClientHandshake, handshake::MidHandshake}; + +use crate::common::*; +use crate::config::*; +use crate::utils::create_random_request; + + +type HandshakeResult = std::result::Result<(tungstenite::protocol::WebSocket, T), tungstenite::handshake::HandshakeError>>; + + +pub enum ConnectionState { + TcpStream(TcpStream), + WebSocket(WebSocket), + MidHandshake(MidHandshake>) +} + + +impl ConnectionState { + fn advance(self, config: &Config) -> Option { + match self { + Self::TcpStream(stream) => { + let req = format!( + "ws://{}:{}", + config.server_address.ip(), + config.server_address.port() + ); + + match ::tungstenite::client(req, stream){ + Ok((ws, _)) => { + Some(ConnectionState::WebSocket(ws)) + }, + Err(HandshakeError::Interrupted(handshake)) => { + Some(ConnectionState::MidHandshake(handshake)) + }, + Err(HandshakeError::Failure(err)) => { + eprintln!("handshake error: {:?}", err); + + None + } + } + }, + Self::MidHandshake(handshake) => { + match handshake.handshake() { + Ok((ws, _)) => { + Some(ConnectionState::WebSocket(ws)) + }, + Err(HandshakeError::Interrupted(handshake)) => { + Some(ConnectionState::MidHandshake(handshake)) + }, + Err(HandshakeError::Failure(err)) => { + eprintln!("handshake error: {:?}", err); + + None + } + } + }, + Self::WebSocket(ws) => Some(Self::WebSocket(ws)), + } + } + +} + + +pub struct Connection { + stream: ConnectionState, + can_send_initial: bool, + marked_as_complete: bool +} + + +impl Connection { + pub fn create_and_register( + config: &Config, + connections: &mut ConnectionMap, + poll: &mut Poll, + token_counter: &mut usize, + ) -> anyhow::Result<()> { + let mut stream = TcpStream::connect(config.server_address)?; + + poll.registry() + .register(&mut stream, Token(*token_counter), Interest::READABLE) + .unwrap(); + + let connection = Connection { + stream: ConnectionState::TcpStream(stream), + can_send_initial: false, + marked_as_complete: false, + }; + + connections.insert(*token_counter, connection); + + *token_counter = *token_counter + 1; + + Ok(()) + } + + pub fn advance(self, config: &Config) -> Option { + if let Some(stream) = self.stream.advance(config){ + Some(Self { + stream, + can_send_initial: self.can_send_initial, + marked_as_complete: false, + }) + } else { + None + } + } + + pub fn read_response_and_send_request( + &mut self, + config: &Config, + state: &LoadTestState, + rng: &mut impl Rng, + ){ + if let ConnectionState::WebSocket(ref mut ws) = self.stream { + loop { + match ws.read_message(){ + Ok(message) => { + Self::register_response_type(state, message); + + break; + }, + Err(tungstenite::Error::Io(err)) if err.kind() == ErrorKind::WouldBlock => { + self.can_send_initial = false; + + eprintln!("handle_read_event error would block: {}", err); + + return; + }, + Err(err) => { + eprintln!("handle_read_event error: {}", err); + + return; + } + } + }; + + self.send_request( + config, + state, + rng, + ); + } + } + + fn register_response_type( + state: &LoadTestState, + message: ::tungstenite::Message, + ){ + state.statistics.responses_announce.fetch_add(1, Ordering::SeqCst); // FIXME + } + + pub fn send_request( + &mut self, + config: &Config, + state: &LoadTestState, + rng: &mut impl Rng, + ){ + if let ConnectionState::WebSocket(ref mut ws) = self.stream { + let request = create_random_request( + &config, + &state, + rng + ); + + let message = request.to_ws_message(); + + match ws.write_message(message){ + Ok(_) => { + state.statistics.requests.fetch_add(1, Ordering::SeqCst); + }, + Err(err) => { + eprintln!("send request error: {}", err); + } + } + + self.can_send_initial = false; + } else { + println!("send request can't send to non-ws stream"); + } + } +} + + +pub type ConnectionMap = HashMap; + + +const NUM_CONNECTIONS: usize = 5; +const CREATE_CONN_INTERVAL: usize = 2 ^ 30; + + +pub fn run_socket_thread( + config: &Config, + state: LoadTestState, + num_initial_requests: usize, +) { + let timeout = Duration::from_micros(config.network.poll_timeout_microseconds); + + let mut connections: ConnectionMap = HashMap::with_capacity(NUM_CONNECTIONS); + let mut poll = Poll::new().expect("create poll"); + let mut events = Events::with_capacity(config.network.poll_event_capacity); + let mut rng = SmallRng::from_entropy(); + + let mut token_counter = 0usize; + + for _ in 0..num_initial_requests { + Connection::create_and_register( + config, + &mut connections, + &mut poll, + &mut token_counter, + ).unwrap(); + } + + println!("num connections in map: {}", connections.len()); + + let mut initial_sent = false; + let mut iter_counter = 0usize; + + let mut num_completed = 0usize; + + loop { + poll.poll(&mut events, Some(timeout)) + .expect("failed polling"); + + for event in events.iter(){ + if event.is_readable(){ + let token = event.token(); + + let mut run_advance = false; + + if let Some(connection) = connections.get_mut(&token.0){ + if let ConnectionState::WebSocket(_) = connection.stream { + connection.read_response_and_send_request( + config, + &state, + &mut rng, + ); + } else { + run_advance = true; + + println!("set run_advance=true"); + } + } else { + eprintln!("connection not found: {:?}", token); + } + + if run_advance { + let connection = connections.remove(&token.0).unwrap(); + + if let Some(connection) = connection.advance(config){ + println!("advanced connection"); + connections.insert(token.0, connection); + } + } + } + } + + if num_completed != token_counter { + for k in 0..token_counter { + if let Some(mut connection) = connections.remove(&k){ + if let ConnectionState::WebSocket(_) = connection.stream { + if !connection.marked_as_complete { + connection.can_send_initial = true; + connection.marked_as_complete = true; + initial_sent = false; + num_completed += 1; + } + + connections.insert(k, connection); + + } else { + if let Some(connection) = connection.advance(config){ + connections.insert(k, connection); + } + } + } else { + // println!("connection not found for token {}", k); + } + } + } + + if !initial_sent { + for (_, connection) in connections.iter_mut(){ + if connection.can_send_initial { + + connection.send_request( + config, + &state, + &mut rng, + ); + + initial_sent = true; + } + } + } + + // Slowly create new connections + if token_counter < NUM_CONNECTIONS && iter_counter % CREATE_CONN_INTERVAL == 0 { + let res = Connection::create_and_register( + config, + &mut connections, + &mut poll, + &mut token_counter, + ); + + if let Err(err) = res { + eprintln!("create connection error: {}", err); + } + + // initial_sent = false; + } + + iter_counter = iter_counter.wrapping_add(1); + } +} diff --git a/aquatic_ws_load_test/src/utils.rs b/aquatic_ws_load_test/src/utils.rs new file mode 100644 index 0000000..138d6f7 --- /dev/null +++ b/aquatic_ws_load_test/src/utils.rs @@ -0,0 +1,114 @@ +use std::sync::Arc; + +use rand::distributions::WeightedIndex; +use rand_distr::Pareto; +use rand::prelude::*; + +use crate::common::*; +use crate::config::*; + + +pub fn create_random_request( + config: &Config, + state: &LoadTestState, + rng: &mut impl Rng, +) -> InMessage { + let weights = [ + config.torrents.weight_announce as u32, + config.torrents.weight_scrape as u32, + ]; + + let items = [ + RequestType::Announce, + RequestType::Scrape, + ]; + + let dist = WeightedIndex::new(&weights) + .expect("random request weighted index"); + + match items[dist.sample(rng)] { + RequestType::Announce => create_announce_request( + config, + state, + rng, + ), + RequestType::Scrape => create_scrape_request( + config, + state, + rng, + ) + } +} + + +#[inline] +fn create_announce_request( + config: &Config, + state: &LoadTestState, + rng: &mut impl Rng, +) -> InMessage { + let (event, bytes_left) = { + if rng.gen_bool(config.torrents.peer_seeder_probability) { + (AnnounceEvent::Completed, 0) + } else { + (AnnounceEvent::Started, 50) + } + }; + + let info_hash_index = select_info_hash_index(config, &state, rng); + + InMessage::AnnounceRequest(AnnounceRequest { + info_hash: state.info_hashes[info_hash_index], + peer_id: PeerId(rng.gen()), + bytes_left: Some(bytes_left), + event, + numwant: None, + offers: None, // FIXME + answer: None, + to_peer_id: None, + offer_id: None, + }) +} + + +#[inline] +fn create_scrape_request( + config: &Config, + state: &LoadTestState, + rng: &mut impl Rng, +) -> InMessage { + let mut scrape_hashes = Vec::with_capacity(5); + + for _ in 0..5 { + let info_hash_index = select_info_hash_index(config, &state, rng); + + scrape_hashes.push(state.info_hashes[info_hash_index]); + } + + InMessage::ScrapeRequest(ScrapeRequest { + info_hashes: scrape_hashes, + }) +} + + +#[inline] +fn select_info_hash_index( + config: &Config, + state: &LoadTestState, + rng: &mut impl Rng, +) -> usize { + pareto_usize(rng, &state.pareto, config.torrents.number_of_torrents - 1) +} + + +#[inline] +fn pareto_usize( + rng: &mut impl Rng, + pareto: &Arc>, + max: usize, +) -> usize { + let p: f64 = pareto.sample(rng); + let p = (p.min(101.0f64) - 1.0) / 100.0; + + (p * max as f64) as usize +} \ No newline at end of file diff --git a/aquatic_ws_protocol/Cargo.toml b/aquatic_ws_protocol/Cargo.toml new file mode 100644 index 0000000..f729d84 --- /dev/null +++ b/aquatic_ws_protocol/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "aquatic_ws_protocol" +version = "0.1.0" +authors = ["Joakim FrostegÄrd "] +edition = "2018" +license = "Apache-2.0" + +[lib] +name = "aquatic_ws_protocol" + +[dependencies] +hashbrown = { version = "0.8", features = ["serde"] } +serde = { version = "1", features = ["derive"] } +serde_json = "1" +tungstenite = "0.11" + +[dev-dependencies] \ No newline at end of file diff --git a/aquatic_ws/src/lib/protocol/mod.rs b/aquatic_ws_protocol/src/lib.rs similarity index 92% rename from aquatic_ws/src/lib/protocol/mod.rs rename to aquatic_ws_protocol/src/lib.rs index 9609069..b4870bc 100644 --- a/aquatic_ws/src/lib/protocol/mod.rs +++ b/aquatic_ws_protocol/src/lib.rs @@ -45,7 +45,7 @@ pub struct OfferId( pub struct JsonValue(pub ::serde_json::Value); -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] pub enum AnnounceEvent { Started, @@ -92,14 +92,14 @@ pub struct MiddlemanAnswerToPeer { /// Element of AnnounceRequest.offers -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct AnnounceRequestOffer { pub offer: JsonValue, pub offer_id: OfferId, } -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct AnnounceRequest { pub info_hash: InfoHash, pub peer_id: PeerId, @@ -145,7 +145,7 @@ pub struct AnnounceResponse { } -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct ScrapeRequest { // If omitted, scrape for all torrents, apparently // There is some kind of parsing here too which accepts a single info hash @@ -244,6 +244,19 @@ impl InMessage { None } + + pub fn to_ws_message(&self) -> ::tungstenite::Message { + let text = match self { + InMessage::AnnounceRequest(r) => { + serde_json::to_string(&ActionWrapper::announce(r)).unwrap() + }, + InMessage::ScrapeRequest(r) => { + serde_json::to_string(&ActionWrapper::scrape(r)).unwrap() + }, + }; + + ::tungstenite::Message::from(text) + } } diff --git a/aquatic_ws/src/lib/protocol/serde_helpers.rs b/aquatic_ws_protocol/src/serde_helpers.rs similarity index 100% rename from aquatic_ws/src/lib/protocol/serde_helpers.rs rename to aquatic_ws_protocol/src/serde_helpers.rs diff --git a/scripts/run-load-test-ws.sh b/scripts/run-load-test-ws.sh new file mode 100755 index 0000000..1451c27 --- /dev/null +++ b/scripts/run-load-test-ws.sh @@ -0,0 +1,13 @@ +#!/bin/sh + +# Compile with target-cpu=native but without AVX512 features, since they +# decrease performance. + +DISABLE_AVX512=$(rustc --print target-features | grep " avx512" | + awk '{print $1}' | sed 's/^/-C target-feature=-/' | xargs) + +export RUSTFLAGS="-C target-cpu=native $DISABLE_AVX512" + +echo "Compiling with RUSTFLAGS=$RUSTFLAGS" + +cargo run --release --bin aquatic_ws_load_test -- $@ \ No newline at end of file