From cc46c4b72cb56a2816bf373e0416270512623210 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Fri, 24 Jul 2020 22:27:06 +0200 Subject: [PATCH] aquatic_http: write to buf ref in request serialization --- aquatic_http_load_test/src/network.rs | 25 +++++++-- aquatic_http_protocol/src/request.rs | 77 +++++++++++---------------- aquatic_http_protocol/src/utils.rs | 15 ++++-- 3 files changed, 60 insertions(+), 57 deletions(-) diff --git a/aquatic_http_load_test/src/network.rs b/aquatic_http_load_test/src/network.rs index 3af69ea..3028d02 100644 --- a/aquatic_http_load_test/src/network.rs +++ b/aquatic_http_load_test/src/network.rs @@ -1,6 +1,6 @@ use std::sync::atomic::Ordering; use std::time::Duration; -use std::io::{Read, Write, ErrorKind}; +use std::io::{Read, Write, ErrorKind, Cursor}; use mio::{net::TcpStream, Events, Poll, Interest, Token}; use rand::{rngs::SmallRng, prelude::*}; @@ -53,6 +53,7 @@ impl Connection { config: &Config, state: &LoadTestState, rng: &mut impl Rng, + request_buffer: &mut Cursor<&mut [u8]>, ){ loop { match self.stream.read(&mut self.read_buffer[self.bytes_read..]){ @@ -90,7 +91,8 @@ impl Connection { self.send_request( config, state, - rng + rng, + request_buffer, ); } @@ -128,6 +130,7 @@ impl Connection { config: &Config, state: &LoadTestState, rng: &mut impl Rng, + request_buffer: &mut Cursor<&mut [u8]>, ){ let request = create_random_request( &config, @@ -135,7 +138,11 @@ impl Connection { rng ); - match self.send_request_inner(state, &request.as_bytes()){ + request_buffer.set_position(0); + request.write(request_buffer).unwrap(); + let position = request_buffer.position() as usize; + + match self.send_request_inner(state, &request_buffer.get_mut()[..position]){ Ok(_) => { state.statistics.requests.fetch_add(1, Ordering::SeqCst); }, @@ -183,6 +190,8 @@ pub fn run_socket_thread( let mut poll = Poll::new().expect("create poll"); let mut events = Events::with_capacity(config.network.poll_event_capacity); let mut rng = SmallRng::from_entropy(); + let mut request_buffer = [0u8; 1024]; + let mut request_buffer = Cursor::new(&mut request_buffer[..]); let mut token_counter = 0usize; @@ -210,7 +219,8 @@ pub fn run_socket_thread( connection.read_response_and_send_request( config, &state, - &mut rng + &mut rng, + &mut request_buffer ); } else { eprintln!("connection not found: {:?}", token); @@ -221,7 +231,12 @@ pub fn run_socket_thread( if !initial_sent { for (_, connection) in connections.iter_mut(){ if connection.can_send_initial { - connection.send_request(config, &state, &mut rng); + connection.send_request( + config, + &state, + &mut rng, + &mut request_buffer + ); initial_sent = true; } diff --git a/aquatic_http_protocol/src/request.rs b/aquatic_http_protocol/src/request.rs index 0e5148f..cf39e3a 100644 --- a/aquatic_http_protocol/src/request.rs +++ b/aquatic_http_protocol/src/request.rs @@ -1,3 +1,5 @@ +use std::io::Write; + use anyhow::Context; use smartstring::{SmartString, LazyCompact}; @@ -20,54 +22,37 @@ pub struct AnnounceRequest { impl AnnounceRequest { - pub fn as_bytes(&self) -> Vec { - let mut bytes = Vec::with_capacity( - 24 + - 60 + - 9 + - 60 + - 6 + - 5 + // high estimate - 6 + - 2 + // estimate - 14 + // FIXME event - 9 + - 1 + - 20 + // numwant bad estimate - 20 + // key bad estimate - 13 - ); + fn write(&self, output: &mut W) -> ::std::io::Result<()> { + output.write(b"GET /announce?info_hash=")?; + urlencode_20_bytes(self.info_hash.0, output)?; - bytes.extend_from_slice(b"GET /announce?info_hash="); - urlencode_20_bytes(self.info_hash.0, &mut bytes); + output.write(b"&peer_id=")?; + urlencode_20_bytes(self.info_hash.0, output)?; - bytes.extend_from_slice(b"&peer_id="); - urlencode_20_bytes(self.info_hash.0, &mut bytes); + output.write(b"&port=")?; + output.write(itoa::Buffer::new().format(self.port).as_bytes())?; - bytes.extend_from_slice(b"&port="); - let _ = itoa::write(&mut bytes, self.port); + output.write(b"&left=")?; + output.write(itoa::Buffer::new().format(self.bytes_left).as_bytes())?; - bytes.extend_from_slice(b"&left="); - let _ = itoa::write(&mut bytes, self.bytes_left); + output.write(b"&event=started")?; // FIXME - bytes.extend_from_slice(b"&event=started"); // FIXME - - bytes.extend_from_slice(b"&compact="); - let _ = itoa::write(&mut bytes, self.compact as u8); + output.write(b"&compact=")?; + output.write(itoa::Buffer::new().format(self.compact as u8).as_bytes())?; if let Some(numwant) = self.numwant { - bytes.extend_from_slice(b"&numwant="); - let _ = itoa::write(&mut bytes, numwant); + output.write(b"&numwant=")?; + output.write(itoa::Buffer::new().format(numwant).as_bytes())?; } if let Some(ref key) = self.key { - bytes.extend_from_slice(b"&key="); - bytes.extend_from_slice(key.as_str().as_bytes()); + output.write(b"&key=")?; + output.write(key.as_str().as_bytes())?; } - bytes.extend_from_slice(b" HTTP/1.1\r\n\r\n"); + output.write(b" HTTP/1.1\r\n\r\n")?; - bytes + Ok(()) } } @@ -79,27 +64,25 @@ pub struct ScrapeRequest { impl ScrapeRequest { - pub fn as_bytes(&self) -> Vec { - let mut bytes = Vec::new(); - - bytes.extend_from_slice(b"GET /scrape?"); + fn write(&self, output: &mut W) -> ::std::io::Result<()> { + output.write(b"GET /scrape?")?; let mut first = true; for info_hash in self.info_hashes.iter() { if !first { - bytes.push(b'&') + output.write(b"&")?; } - bytes.extend_from_slice(b"info_hash="); - urlencode_20_bytes(info_hash.0, &mut bytes); + output.write(b"info_hash=")?; + urlencode_20_bytes(info_hash.0, output)?; first = false; } - bytes.extend_from_slice(b" HTTP/1.1\r\n\r\n"); + output.write(b" HTTP/1.1\r\n\r\n")?; - bytes + Ok(()) } } @@ -270,10 +253,10 @@ impl Request { } } - pub fn as_bytes(&self) -> Vec { + pub fn write(&self, output: &mut W) -> ::std::io::Result<()> { match self { - Self::Announce(r) => r.as_bytes(), - Self::Scrape(r) => r.as_bytes(), + Self::Announce(r) => r.write(output), + Self::Scrape(r) => r.write(output), } } } diff --git a/aquatic_http_protocol/src/utils.rs b/aquatic_http_protocol/src/utils.rs index 1ac1b76..17adce4 100644 --- a/aquatic_http_protocol/src/utils.rs +++ b/aquatic_http_protocol/src/utils.rs @@ -8,15 +8,20 @@ use smartstring::{SmartString, LazyCompact}; use super::response::ResponsePeer; -pub fn urlencode_20_bytes(input: [u8; 20], output: &mut impl Write){ +pub fn urlencode_20_bytes( + input: [u8; 20], + output: &mut impl Write +) -> ::std::io::Result<()> { let mut tmp = [0u8; 2]; for i in 0..input.len() { hex::encode_to_slice(&input[i..i+1], &mut tmp).unwrap(); - output.write(b"%"); - output.write(&tmp); + output.write(b"%")?; + output.write(&tmp)?; } + + Ok(()) } @@ -160,7 +165,7 @@ mod tests { let mut output = Vec::new(); - urlencode_20_bytes(input, &mut output); + urlencode_20_bytes(input, &mut output).unwrap(); assert_eq!(output.len(), 60); @@ -195,7 +200,7 @@ mod tests { let mut output = Vec::new(); - urlencode_20_bytes(input, &mut output); + urlencode_20_bytes(input, &mut output).unwrap(); let s = ::std::str::from_utf8(&output).unwrap();