diff --git a/Cargo.lock b/Cargo.lock index 0ed52cf..d2439c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -105,6 +105,7 @@ dependencies = [ "aquatic_cli_helpers", "aquatic_http_protocol", "crossbeam-channel", + "hashbrown", "mimalloc", "mio", "quickcheck", @@ -123,6 +124,7 @@ dependencies = [ "bendy", "criterion", "hashbrown", + "hex", "itoa", "log", "memchr", @@ -824,6 +826,12 @@ dependencies = [ "libc", ] +[[package]] +name = "hex" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "644f9158b2f133fd50f5fb3242878846d9eb792e445c893805ff0e3824006e35" + [[package]] name = "histogram" version = "0.6.9" diff --git a/aquatic_http_load_test/Cargo.toml b/aquatic_http_load_test/Cargo.toml index e6377b8..f7514da 100644 --- a/aquatic_http_load_test/Cargo.toml +++ b/aquatic_http_load_test/Cargo.toml @@ -13,6 +13,7 @@ anyhow = "1" aquatic_cli_helpers = { path = "../aquatic_cli_helpers" } aquatic_http_protocol = { path = "../aquatic_http_protocol" } crossbeam-channel = "0.4" +hashbrown = { version = "0.7", features = ["serde"] } mimalloc = { version = "0.1", default-features = false } mio = { version = "0.7", features = ["udp", "os-poll", "os-util"] } rand = { version = "0.7", features = ["small_rng"] } diff --git a/aquatic_http_load_test/src/common.rs b/aquatic_http_load_test/src/common.rs index e24ef7d..a8cfb30 100644 --- a/aquatic_http_load_test/src/common.rs +++ b/aquatic_http_load_test/src/common.rs @@ -133,7 +133,7 @@ impl Default for HandlerConfig { peer_seeder_probability: 0.25, scrape_max_torrents: 50, weight_announce: 5, - weight_scrape: 1, + weight_scrape: 0, additional_request_factor: 0.4, max_responses_per_iter: 10_000, channel_timeout: 200, diff --git a/aquatic_http_load_test/src/main.rs b/aquatic_http_load_test/src/main.rs index 388ed06..6b607f1 100644 --- a/aquatic_http_load_test/src/main.rs +++ b/aquatic_http_load_test/src/main.rs @@ -40,8 +40,10 @@ fn run(config: Config) -> ::anyhow::Result<()> { let mut info_hashes = Vec::with_capacity(config.handler.number_of_torrents); + let mut rng = SmallRng::from_entropy(); + for _ in 0..config.handler.number_of_torrents { - info_hashes.push(generate_info_hash()); + info_hashes.push(generate_info_hash(&mut rng)); } let pareto = Pareto::new( @@ -57,60 +59,20 @@ fn run(config: Config) -> ::anyhow::Result<()> { // Start socket workers - let (response_sender, response_receiver) = unbounded(); - let mut request_senders = Vec::new(); - for i in 0..config.num_socket_workers { - let thread_id = ThreadId(i); + for _ in 0..config.num_socket_workers { let (sender, receiver) = unbounded(); - let port = config.network.first_port + (i as u16); - - let addr = if config.network.multiple_client_ips { - let ip = if config.network.ipv6_client { // FIXME: test ipv6 - Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1 + i as u16).into() - } else { - Ipv4Addr::new(127, 0, 0, 1 + i).into() - }; - - SocketAddr::new(ip, port) - } else { - let ip = if config.network.ipv6_client { - Ipv6Addr::LOCALHOST.into() - } else { - Ipv4Addr::LOCALHOST.into() - }; - - SocketAddr::new(ip, port) - }; request_senders.push(sender); let config = config.clone(); - let response_sender = response_sender.clone(); let state = state.clone(); thread::spawn(move || run_socket_thread( + &config, state, - response_sender, receiver, - &config, - addr, - thread_id - )); - } - - for _ in 0..config.num_request_workers { - let config = config.clone(); - let state= state.clone(); - let request_senders = request_senders.clone(); - let response_receiver = response_receiver.clone(); - - thread::spawn(move || run_handler_thread( - &config, - state, - request_senders, - response_receiver, )); } diff --git a/aquatic_http_load_test/src/network.rs b/aquatic_http_load_test/src/network.rs index 334964c..be583bd 100644 --- a/aquatic_http_load_test/src/network.rs +++ b/aquatic_http_load_test/src/network.rs @@ -1,198 +1,248 @@ -use std::io::Cursor; use std::net::SocketAddr; use std::sync::atomic::Ordering; use std::time::Duration; +use std::io::{Read, Write, ErrorKind}; +use anyhow::Context; use crossbeam_channel::{Receiver, Sender}; -use mio::{net::UdpSocket, Events, Poll, Interest, Token}; +use hashbrown::HashMap; +use mio::{net::TcpStream, Events, Poll, Interest, Token}; use socket2::{Socket, Domain, Type, Protocol}; +use rand::{rngs::SmallRng, prelude::*}; use crate::common::*; +use crate::handler::create_random_request; const MAX_PACKET_SIZE: usize = 4096; -pub fn create_socket( - config: &Config, - addr: SocketAddr -) -> ::std::net::UdpSocket { - let socket = if addr.is_ipv4(){ - Socket::new(Domain::ipv4(), Type::dgram(), Some(Protocol::udp())) - } else { - Socket::new(Domain::ipv6(), Type::dgram(), Some(Protocol::udp())) - }.expect("create socket"); - socket.set_nonblocking(true) - .expect("socket: set nonblocking"); - - if config.network.recv_buffer != 0 { - if let Err(err) = socket.set_recv_buffer_size(config.network.recv_buffer){ - eprintln!( - "socket: failed setting recv buffer to {}: {:?}", - config.network.recv_buffer, - err - ); - } +pub fn create_socket( + address: SocketAddr, + server_address: SocketAddr, + ipv6_only: bool +) -> ::anyhow::Result { + let builder = if address.is_ipv4(){ + Socket::new(Domain::ipv4(), Type::stream(), Some(Protocol::tcp())) + } else { + Socket::new(Domain::ipv6(), Type::stream(), Some(Protocol::tcp())) + }.context("Couldn't create socket2::Socket")?; + + if ipv6_only { + builder.set_only_v6(true) + .context("Couldn't put socket in ipv6 only mode")? } - socket.bind(&addr.into()) - .expect(&format!("socket: bind to {}", addr)); + builder.set_nonblocking(true) + .context("Couldn't put socket in non-blocking mode")?; + builder.set_reuse_port(true) + .context("Couldn't put socket in reuse_port mode")?; + builder.connect(&server_address.into()).with_context(|| + format!("Couldn't connect to address {}", server_address) + )?; - socket.connect(&config.server_address.into()) - .expect("socket: connect to server"); - - socket.into_udp_socket() + Ok(TcpStream::from_std(builder.into_tcp_stream())) } -pub fn run_socket_thread( - state: LoadTestState, - response_channel_sender: Sender<(ThreadId, Response)>, - request_receiver: Receiver, - config: &Config, - addr: SocketAddr, - thread_id: ThreadId -) { - let mut socket = UdpSocket::from_std(create_socket(config, addr)); - let mut buffer = [0u8; MAX_PACKET_SIZE]; - let token = Token(thread_id.0 as usize); - let interests = Interest::READABLE; +pub struct Connection { + stream: TcpStream, + read_buffer: [u8; 2048], + bytes_read: usize, + can_send: bool, +} + + +impl Connection { + pub fn create_and_register( + config: &Config, + state: &LoadTestState, + connections: &mut ConnectionMap, + poll: &mut Poll, + token_counter: &mut usize, + ) -> anyhow::Result<()> { + let mut stream = TcpStream::connect(config.server_address)?; + + poll.registry() + .register(&mut stream, Token(*token_counter), Interest::READABLE) + .unwrap(); + + let connection = Connection { + stream, + read_buffer: [0; 2048], + bytes_read: 0, + can_send: true, + }; + + connections.insert(*token_counter, connection); + + *token_counter = token_counter.wrapping_add(1); + + Ok(()) + } + + pub fn read_response_and_send_request( + &mut self, + config: &Config, + state: &LoadTestState, + rng: &mut impl Rng, + ) -> bool { // true = response received + loop { + match self.stream.read(&mut self.read_buffer){ + Ok(bytes_read) => { + self.bytes_read = bytes_read; + + break; + }, + Err(err) if err.kind() == ErrorKind::WouldBlock => { + self.can_send = false; + + eprintln!("handle_read_event error would block: {}", err); + + return false; + }, + Err(err) => { + self.bytes_read = 0; + + eprintln!("handle_read_event error: {}", err); + + return false; + } + } + }; + + let res_response = Response::from_bytes( + &self.read_buffer[..self.bytes_read] + ); + + self.bytes_read = 0; + + match res_response { + Ok(Response::Announce(_)) => { + state.statistics.responses_announce + .fetch_add(1, Ordering::SeqCst); + }, + Ok(Response::Scrape(_)) => { + state.statistics.responses_scrape + .fetch_add(1, Ordering::SeqCst); + }, + Ok(Response::Failure(_)) => { + state.statistics.responses_failure + .fetch_add(1, Ordering::SeqCst); + }, + Err(err) => { + eprintln!("response from bytes error: {}", err); + + return false; + } + } + + self.send_request( + config, + state, + rng + ); + + true + } + + pub fn send_request( + &mut self, + config: &Config, + state: &LoadTestState, + rng: &mut impl Rng, + ){ + let request = create_random_request( + &config, + &state, + rng + ); + + match self.send_request_inner(&request.as_bytes()){ + Ok(_) => { + state.statistics.requests.fetch_add(1, Ordering::SeqCst); + }, + Err(err) => { + eprintln!("send request error: {}", err); + } + } + + self.can_send = false; + } + + fn send_request_inner(&mut self, request: &[u8]) -> ::std::io::Result<()> { + self.stream.write(request)?; + self.stream.flush()?; + + Ok(()) + } + +} + + +pub type ConnectionMap = HashMap; + + +pub fn run_socket_thread( + config: &Config, + state: LoadTestState, + request_receiver: Receiver, +) { let timeout = Duration::from_micros(config.network.poll_timeout); + let mut connections: ConnectionMap = HashMap::new(); let mut poll = Poll::new().expect("create poll"); - - poll.registry() - .register(&mut socket, token, interests) - .unwrap(); - let mut events = Events::with_capacity(config.network.poll_event_capacity); + let mut rng = SmallRng::from_entropy(); - let mut statistics = SocketWorkerLocalStatistics::default(); - let mut responses = Vec::new(); + let mut token_counter = 0usize; + + for _ in request_receiver.try_recv(){ + Connection::create_and_register( + config, + &state, + &mut connections, + &mut poll, + &mut token_counter, + ).unwrap(); + } loop { + let mut responses_received = 0usize; + poll.poll(&mut events, Some(timeout)) .expect("failed polling"); for event in events.iter(){ - if event.token() == token { - if event.is_readable(){ - read_responses( - thread_id, - &socket, - &mut buffer, - &mut statistics, - &mut responses - ); + if event.is_readable(){ + let token = event.token(); - for r in responses.drain(..){ - response_channel_sender.send(r) - .expect(&format!( - "add response to channel in socket worker {}", - thread_id.0 - )); + if let Some(connection) = connections.get_mut(&token.0){ + if connection.read_response_and_send_request(config, &state, &mut rng){ + responses_received += 1; } - - poll.registry() - .reregister(&mut socket, token, interests) - .unwrap(); + } else { + eprintln!("connection not found: {:?}", token); } } + } - send_requests( + for (_, connection) in connections.iter_mut(){ + if connection.can_send { + connection.send_request(config, &state, &mut rng); + } + } + + if token_counter < 1 && responses_received > 0 { + Connection::create_and_register( + config, &state, - &mut socket, - &mut buffer, - &request_receiver, - &mut statistics - ); - } - - send_requests( - &state, - &mut socket, - &mut buffer, - &request_receiver, - &mut statistics - ); - - state.statistics.requests - .fetch_add(statistics.requests, Ordering::SeqCst); - state.statistics.responses_announce - .fetch_add(statistics.responses_announce, Ordering::SeqCst); - state.statistics.responses_scrape - .fetch_add(statistics.responses_scrape, Ordering::SeqCst); - state.statistics.responses_failure - .fetch_add(statistics.responses_failure, Ordering::SeqCst); - state.statistics.response_peers - .fetch_add(statistics.response_peers, Ordering::SeqCst); - - statistics = SocketWorkerLocalStatistics::default(); - } -} - - -fn read_responses( - thread_id: ThreadId, - socket: &UdpSocket, - buffer: &mut [u8], - ls: &mut SocketWorkerLocalStatistics, - responses: &mut Vec<(ThreadId, Response)>, -){ - while let Ok(amt) = socket.recv(buffer) { - match response_from_bytes(&buffer[0..amt]){ - Ok(response) => { - match response { - Response::Announce(ref r) => { - ls.responses_announce += 1; - ls.response_peers += r.peers.len(); - }, - Response::Scrape(_) => { - ls.responses_scrape += 1; - }, - Response::Failure(_) => { - ls.responses_failure += 1; - }, - } - - responses.push((thread_id, response)) - }, - Err(err) => { - eprintln!("Received invalid response: {:#?}", err); - } + &mut connections, + &mut poll, + &mut token_counter, + ).unwrap(); } } } - - -fn send_requests( - state: &LoadTestState, - socket: &mut UdpSocket, - buffer: &mut [u8], - receiver: &Receiver, - statistics: &mut SocketWorkerLocalStatistics, -){ - let mut cursor = Cursor::new(buffer); - - while let Ok(request) = receiver.try_recv() { - cursor.set_position(0); - - if let Err(err) = request_to_bytes(&mut cursor, request){ - eprintln!("request_to_bytes err: {}", err); - } - - let position = cursor.position() as usize; - let inner = cursor.get_ref(); - - match socket.send(&inner[..position]) { - Ok(_) => { - statistics.requests += 1; - }, - Err(err) => { - eprintln!("Couldn't send packet: {:?}", err); - } - } - } -} \ No newline at end of file diff --git a/aquatic_http_load_test/src/utils.rs b/aquatic_http_load_test/src/utils.rs index b28eb9f..85c438a 100644 --- a/aquatic_http_load_test/src/utils.rs +++ b/aquatic_http_load_test/src/utils.rs @@ -11,13 +11,13 @@ pub fn select_info_hash_index( state: &LoadTestState, rng: &mut impl Rng, ) -> usize { - pareto_usize(rng, state.pareto, config.handler.number_of_torrents - 1) + pareto_usize(rng, &state.pareto, config.handler.number_of_torrents - 1) } pub fn pareto_usize( rng: &mut impl Rng, - pareto: Arc>, + pareto: &Arc>, max: usize, ) -> usize { let p: f64 = pareto.sample(rng); @@ -27,27 +27,6 @@ pub fn pareto_usize( } -pub fn generate_peer_id() -> PeerId { - PeerId(random_20_bytes()) +pub fn generate_info_hash(rng: &mut impl Rng) -> InfoHash { + InfoHash(rng.gen()) } - - -pub fn generate_info_hash() -> InfoHash { - InfoHash(random_20_bytes()) -} - - -// Don't use SmallRng here for now -fn random_20_bytes() -> [u8; 20] { - let mut bytes = [0; 20]; - - for (i, b) in rand::thread_rng() - .sample_iter(&Standard) - .enumerate() - .take(20) { - - bytes[i] = b - } - - bytes -} \ No newline at end of file diff --git a/aquatic_http_protocol/Cargo.toml b/aquatic_http_protocol/Cargo.toml index f1de917..d9ed4f3 100644 --- a/aquatic_http_protocol/Cargo.toml +++ b/aquatic_http_protocol/Cargo.toml @@ -22,6 +22,7 @@ harness = false anyhow = "1" bendy = { version = "0.3", features = ["std", "serde"] } hashbrown = { version = "0.7", features = ["serde"] } +hex = { version = "0.4", default-features = false } itoa = "0.4" log = "0.4" memchr = "2" diff --git a/aquatic_http_protocol/src/request.rs b/aquatic_http_protocol/src/request.rs index ede9446..2717d11 100644 --- a/aquatic_http_protocol/src/request.rs +++ b/aquatic_http_protocol/src/request.rs @@ -1,3 +1,5 @@ +use std::io::Write; + use anyhow::Context; use hashbrown::HashMap; use smartstring::{SmartString, LazyCompact}; @@ -20,12 +22,80 @@ pub struct AnnounceRequest { } +impl AnnounceRequest { + pub fn as_bytes(&self) -> Vec { + let mut bytes = Vec::new(); + + bytes.extend_from_slice(b"GET /announce?info_hash="); + bytes.extend_from_slice(&urlencode_20_bytes(self.info_hash.0)); + + bytes.extend_from_slice(b"&peer_id="); + bytes.extend_from_slice(&urlencode_20_bytes(self.peer_id.0)); + + bytes.extend_from_slice(b"&port="); + itoa::write(&mut bytes, self.port); + + bytes.extend_from_slice(b"&left="); + itoa::write(&mut bytes, self.bytes_left); + + bytes.extend_from_slice(b"&event=started"); // FIXME + + bytes.extend_from_slice(b"&compact="); + itoa::write(&mut bytes, self.compact as u8); + + if let Some(numwant) = self.numwant { + bytes.extend_from_slice(b"&numwant="); + itoa::write(&mut bytes, numwant); + } + + if let Some(ref key) = self.key { + bytes.extend_from_slice(b"&key="); + bytes.extend_from_slice(key.as_str().as_bytes()); + } + + bytes.extend_from_slice(b" HTTP/1.1\r\n\r\n"); + + bytes + } +} + + #[derive(Debug, Clone, PartialEq, Eq)] pub struct ScrapeRequest { pub info_hashes: Vec, } +impl ScrapeRequest { + pub fn as_bytes(&self) -> Vec { + let mut bytes = Vec::new(); + + bytes.extend_from_slice(b"GET /scrape?"); + + let mut first = true; + + for info_hash in self.info_hashes.iter() { + if !first { + bytes.push(b'&') + } + + bytes.extend_from_slice(b"info_hash="); + + for b in info_hash.0.iter() { + bytes.push(b'%'); + bytes.extend_from_slice(format!("{:02x}", b).as_bytes()); + } + + first = false; + } + + bytes.extend_from_slice(b" HTTP/1.1\r\n\r\n"); + + bytes + } +} + + #[derive(Debug, Clone, PartialEq, Eq)] pub enum Request { Announce(AnnounceRequest), @@ -263,6 +333,13 @@ impl Request { Ok(processed) } + + pub fn as_bytes(&self) -> Vec { + match self { + Self::Announce(r) => r.as_bytes(), + Self::Scrape(r) => r.as_bytes(), + } + } } diff --git a/aquatic_http_protocol/src/response.rs b/aquatic_http_protocol/src/response.rs index 46c8a24..baf7057 100644 --- a/aquatic_http_protocol/src/response.rs +++ b/aquatic_http_protocol/src/response.rs @@ -187,6 +187,12 @@ impl Response { Response::Scrape(r) => r.to_bytes(), } } + + pub fn from_bytes(bytes: &[u8]) -> anyhow::Result { + Ok(Self::Failure(FailureResponse { + failure_reason: "fake response".to_string() + })) + } } diff --git a/aquatic_http_protocol/src/utils.rs b/aquatic_http_protocol/src/utils.rs index 96a6200..e6d1e89 100644 --- a/aquatic_http_protocol/src/utils.rs +++ b/aquatic_http_protocol/src/utils.rs @@ -6,6 +6,22 @@ use smartstring::{SmartString, LazyCompact}; use super::response::ResponsePeer; +pub fn urlencode_20_bytes(input: [u8; 20]) -> Vec { + let mut tmp = [0u8; 40]; + + hex::encode_to_slice(&input, &mut tmp); + + let mut output = Vec::with_capacity(60); + + for chunk in tmp.chunks_exact(2){ + output.push(b'%'); + output.extend_from_slice(chunk); + } + + output +} + + /// Not for serde pub fn deserialize_20_bytes(value: SmartString) -> anyhow::Result<[u8; 20]> { let mut arr = [0u8; 20];