From a487347a0d74fc5c53b71a94971b865ec59c0151 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 2 Jul 2020 13:21:39 +0200 Subject: [PATCH] aquatic_http: continue implementation work --- TODO.md | 6 +- aquatic_http/src/lib/network/connection.rs | 64 ++++++---- aquatic_http/src/lib/network/mod.rs | 26 ++-- aquatic_http/src/lib/protocol/mod.rs | 48 ++++--- .../src/lib/protocol/serde_helpers.rs | 118 ------------------ scripts/run-aquatic-http.sh | 5 + 6 files changed, 96 insertions(+), 171 deletions(-) create mode 100755 scripts/run-aquatic-http.sh diff --git a/TODO.md b/TODO.md index c1d0251..f286c52 100644 --- a/TODO.md +++ b/TODO.md @@ -7,9 +7,11 @@ ## aquatic_http * handshake stuff + * fix overcomplicated and probably incorrect implementation * support TLS and plain at the same time?? - * simplify -* test +* response serialization, https://crates.io/crates/bendy +* response content length +* scrape info hash parsing * move stuff to common crate with ws: what about Request/InMessage etc? ## aquatic_ws diff --git a/aquatic_http/src/lib/network/connection.rs b/aquatic_http/src/lib/network/connection.rs index 6534d33..2b7e902 100644 --- a/aquatic_http/src/lib/network/connection.rs +++ b/aquatic_http/src/lib/network/connection.rs @@ -88,7 +88,6 @@ impl Write for Stream { pub enum RequestParseError { NeedMoreData, Invalid, - Incomplete, Io(::std::io::Error), Parse(::httparse::Error) } @@ -97,7 +96,8 @@ pub enum RequestParseError { pub struct EstablishedConnection { stream: Stream, pub peer_addr: SocketAddr, - buf: Vec, + buf: [u8; 1024], + bytes_read: usize, } @@ -108,17 +108,17 @@ impl EstablishedConnection { Self { stream, peer_addr, - buf: Vec::new(), // FIXME: with capacity of like 100? + buf: [0; 1024], // FIXME: fixed size is stupid + bytes_read: 0, } } pub fn parse_request(&mut self) -> Result { - match self.stream.read(&mut self.buf){ - Ok(0) => { - // FIXME: finished reading completely here? - }, - Ok(_) => { - return Err(RequestParseError::NeedMoreData); + match self.stream.read(&mut self.buf[self.bytes_read..]){ + Ok(bytes_read) => { + self.bytes_read += bytes_read; + + info!("parse request read {} bytes", bytes_read); }, Err(err) if err.kind() == ErrorKind::WouldBlock => { return Err(RequestParseError::NeedMoreData); @@ -128,45 +128,49 @@ impl EstablishedConnection { } } - let mut headers = [httparse::EMPTY_HEADER; 1]; + if self.bytes_read == 0 { + return Err(RequestParseError::NeedMoreData); // FIXME: ??? + } + + let mut headers = [httparse::EMPTY_HEADER; 16]; let mut request = httparse::Request::new(&mut headers); - let request = match request.parse(&self.buf){ + let request = match request.parse(&self.buf[..self.bytes_read]){ Ok(httparse::Status::Complete(_)) => { - if let Some(request) = Request::from_http(request){ + let result = if let Some(request) = Request::from_http(request){ Ok(request) } else { Err(RequestParseError::Invalid) - } + }; + + self.bytes_read = 0; + + result }, Ok(httparse::Status::Partial) => { - Err(RequestParseError::Incomplete) + Err(RequestParseError::NeedMoreData) }, Err(err) => { + self.bytes_read = 0; + Err(RequestParseError::Parse(err)) } }; - self.buf.clear(); - self.buf.shrink_to_fit(); - request } - pub fn send_response(&mut self, body: &str) -> Result<(), RequestParseError> { + pub fn send_response(&mut self, body: &str) -> ::std::io::Result<()> { let mut response = String::new(); - response.push_str("200 OK\r\n\r\n"); + response.push_str("HTTP/1.1 200 OK\r\n\r\n"); // FIXME: content-length response.push_str(body); + response.push_str("\r\n"); - match self.stream.write(response.as_bytes()){ - Ok(_) => Ok(()), - Err(err) => { - info!("send response: {:?}", err); + self.stream.write(response.as_bytes())?; + self.stream.flush()?; - Err(RequestParseError::Io(err)) - } - } + Ok(()) } } @@ -195,7 +199,13 @@ impl <'a>HandshakeMachine { tls_acceptor.accept(stream) ) } else { - (Some(Either::Left(EstablishedConnection::new(Stream::TcpStream(stream)))), false) + log::debug!("established connection"); + + let established_connection = EstablishedConnection::new( + Stream::TcpStream(stream) + ); + + (Some(Either::Left(established_connection)), false) } }, HandshakeMachine::TlsMidHandshake(handshake) => { diff --git a/aquatic_http/src/lib/network/mod.rs b/aquatic_http/src/lib/network/mod.rs index 417ef36..2de3745 100644 --- a/aquatic_http/src/lib/network/mod.rs +++ b/aquatic_http/src/lib/network/mod.rs @@ -182,7 +182,7 @@ pub fn run_handshake_and_read_requests<'a>( peer_addr: established_connection.peer_addr }; - debug!("read message"); + debug!("read request, sending to handler"); if let Err(err) = request_channel_sender .send((meta, request)) @@ -192,12 +192,16 @@ pub fn run_handshake_and_read_requests<'a>( err ); } + + break }, Err(RequestParseError::NeedMoreData) => { + info!("need more data"); + break; }, Err(RequestParseError::Io(err)) => { - info!("error reading messages: {}", err); + info!("error reading request: {}", err); remove_connection_if_exists(connections, poll_token); @@ -205,6 +209,8 @@ pub fn run_handshake_and_read_requests<'a>( }, Err(e) => { info!("error reading request: {:?}", e); + + remove_connection_if_exists(connections, poll_token); break; }, @@ -245,10 +251,17 @@ pub fn send_responses( match established.send_response(&response.to_http_string()){ Ok(()) => { - debug!("sent message"); + debug!("sent response"); + + remove_connection_if_exists( + connections, + meta.poll_token + ); }, - Err(RequestParseError::NeedMoreData) => {}, // FIXME: block? - Err(RequestParseError::Io(err)) => { + Err(err) if err.kind() == ErrorKind::WouldBlock => { + debug!("send response: would block"); + }, + Err(err) => { info!("error sending response: {}", err); remove_connection_if_exists( @@ -256,9 +269,6 @@ pub fn send_responses( meta.poll_token ); }, - _ => { - unreachable!() - } } } } diff --git a/aquatic_http/src/lib/protocol/mod.rs b/aquatic_http/src/lib/protocol/mod.rs index ed02800..59f73ba 100644 --- a/aquatic_http/src/lib/protocol/mod.rs +++ b/aquatic_http/src/lib/protocol/mod.rs @@ -9,25 +9,25 @@ use crate::common::Peer; // use serde_helpers::*; -#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)] #[serde(transparent)] pub struct PeerId( // #[serde( // deserialize_with = "deserialize_20_bytes", // serialize_with = "serialize_20_bytes" // )] - pub [u8; 20] + pub String ); -#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)] #[serde(transparent)] pub struct InfoHash( // #[serde( // deserialize_with = "deserialize_20_bytes", // serialize_with = "serialize_20_bytes" // )] - pub [u8; 20] + pub String ); @@ -76,8 +76,8 @@ pub struct AnnounceRequest { pub bytes_left: usize, #[serde(default)] pub event: AnnounceEvent, - /// FIXME: number: 0 or 1 - pub compact: bool, + /// FIXME: number: 0 or 1 to bool + pub compact: u8, /// Requested number of peers to return pub numwant: usize, } @@ -130,16 +130,32 @@ pub enum Request { impl Request { pub fn from_http(http: httparse::Request) -> Option { - http.path - .and_then(|path| { - let mut iterator = path.splitn(2, '?'); + log::debug!("path: {:?}", http.path); - iterator.next(); - iterator.next() - }) - .and_then(|query_string| { - serde_urlencoded::from_str(query_string).ok() - }) + let path = http.path?; + + let mut split_parts= path.splitn(2, '?'); + + let path = split_parts.next()?; + let query_string = split_parts.next()?; + + if path == "/announce" { + let result: Result = serde_urlencoded::from_str(query_string); + + if let Err(ref err) = result { + log::debug!("error: {}", err); + } + + result.ok().map(Request::Announce) + } else { + let result: Result = serde_urlencoded::from_str(query_string); + + if let Err(ref err) = result { + log::debug!("error: {}", err); + } + + result.ok().map(Request::Scrape) + } } } @@ -154,6 +170,6 @@ pub enum Response { impl Response { pub fn to_http_string(self) -> String { - unimplemented!() + "(response)".to_string() } } \ No newline at end of file diff --git a/aquatic_http/src/lib/protocol/serde_helpers.rs b/aquatic_http/src/lib/protocol/serde_helpers.rs index f7cea32..a59b525 100644 --- a/aquatic_http/src/lib/protocol/serde_helpers.rs +++ b/aquatic_http/src/lib/protocol/serde_helpers.rs @@ -126,122 +126,4 @@ pub fn deserialize_info_hashes<'de, D>( where D: Deserializer<'de>, { Ok(deserializer.deserialize_any(InfoHashVecVisitor).unwrap_or_default()) -} - - -#[cfg(test)] -mod tests { - use serde::Deserialize; - - use super::*; - - fn info_hash_from_bytes(bytes: &[u8]) -> InfoHash { - let mut arr = [0u8; 20]; - - assert!(bytes.len() == 20); - - arr.copy_from_slice(&bytes[..]); - - InfoHash(arr) - } - - #[test] - fn test_deserialize_20_bytes(){ - let input = r#""aaaabbbbccccddddeeee""#; - - let expected = info_hash_from_bytes(b"aaaabbbbccccddddeeee"); - let observed: InfoHash = serde_json::from_str(input).unwrap(); - - assert_eq!(observed, expected); - - let input = r#""aaaabbbbccccddddeee""#; - let res_info_hash: Result = serde_json::from_str(input); - - assert!(res_info_hash.is_err()); - - let input = r#""aaaabbbbccccddddeeeš•Š""#; - let res_info_hash: Result = serde_json::from_str(input); - - assert!(res_info_hash.is_err()); - } - - #[test] - fn test_serde_20_bytes(){ - let info_hash = info_hash_from_bytes(b"aaaabbbbccccddddeeee"); - - let out = serde_json::to_string(&info_hash).unwrap(); - let info_hash_2 = serde_json::from_str(&out).unwrap(); - - assert_eq!(info_hash, info_hash_2); - } - - #[derive(Debug, PartialEq, Eq, Deserialize)] - struct Test { - #[serde(deserialize_with = "deserialize_info_hashes", default)] - info_hashes: Vec, - } - - - #[test] - fn test_deserialize_info_hashes_vec(){ - let input = r#"{ - "info_hashes": ["aaaabbbbccccddddeeee", "aaaabbbbccccddddeeee"] - }"#; - - let expected = Test { - info_hashes: vec![ - info_hash_from_bytes(b"aaaabbbbccccddddeeee"), - info_hash_from_bytes(b"aaaabbbbccccddddeeee"), - ] - }; - - let observed: Test = serde_json::from_str(input).unwrap(); - - assert_eq!(observed, expected); - } - - #[test] - fn test_deserialize_info_hashes_str(){ - let input = r#"{ - "info_hashes": "aaaabbbbccccddddeeee" - }"#; - - let expected = Test { - info_hashes: vec![ - info_hash_from_bytes(b"aaaabbbbccccddddeeee"), - ] - }; - - let observed: Test = serde_json::from_str(input).unwrap(); - - assert_eq!(observed, expected); - } - - #[test] - fn test_deserialize_info_hashes_null(){ - let input = r#"{ - "info_hashes": null - }"#; - - let expected = Test { - info_hashes: vec![] - }; - - let observed: Test = serde_json::from_str(input).unwrap(); - - assert_eq!(observed, expected); - } - - #[test] - fn test_deserialize_info_hashes_missing(){ - let input = r#"{}"#; - - let expected = Test { - info_hashes: vec![] - }; - - let observed: Test = serde_json::from_str(input).unwrap(); - - assert_eq!(observed, expected); - } } \ No newline at end of file diff --git a/scripts/run-aquatic-http.sh b/scripts/run-aquatic-http.sh new file mode 100755 index 0000000..a32c833 --- /dev/null +++ b/scripts/run-aquatic-http.sh @@ -0,0 +1,5 @@ +#!/bin/sh + +export RUSTFLAGS="-C target-cpu=native" + +cargo run --release --bin aquatic_http -- $@