aquatic_http: continue implementation work

This commit is contained in:
Joakim Frostegård 2020-07-02 13:21:39 +02:00
parent 76079cf66e
commit a487347a0d
6 changed files with 96 additions and 171 deletions

View file

@ -7,9 +7,11 @@
## aquatic_http ## aquatic_http
* handshake stuff * handshake stuff
* fix overcomplicated and probably incorrect implementation
* support TLS and plain at the same time?? * support TLS and plain at the same time??
* simplify * response serialization, https://crates.io/crates/bendy
* test * response content length
* scrape info hash parsing
* move stuff to common crate with ws: what about Request/InMessage etc? * move stuff to common crate with ws: what about Request/InMessage etc?
## aquatic_ws ## aquatic_ws

View file

@ -88,7 +88,6 @@ impl Write for Stream {
pub enum RequestParseError { pub enum RequestParseError {
NeedMoreData, NeedMoreData,
Invalid, Invalid,
Incomplete,
Io(::std::io::Error), Io(::std::io::Error),
Parse(::httparse::Error) Parse(::httparse::Error)
} }
@ -97,7 +96,8 @@ pub enum RequestParseError {
pub struct EstablishedConnection { pub struct EstablishedConnection {
stream: Stream, stream: Stream,
pub peer_addr: SocketAddr, pub peer_addr: SocketAddr,
buf: Vec<u8>, buf: [u8; 1024],
bytes_read: usize,
} }
@ -108,17 +108,17 @@ impl EstablishedConnection {
Self { Self {
stream, stream,
peer_addr, peer_addr,
buf: Vec::new(), // FIXME: with capacity of like 100? buf: [0; 1024], // FIXME: fixed size is stupid
bytes_read: 0,
} }
} }
pub fn parse_request(&mut self) -> Result<Request, RequestParseError> { pub fn parse_request(&mut self) -> Result<Request, RequestParseError> {
match self.stream.read(&mut self.buf){ match self.stream.read(&mut self.buf[self.bytes_read..]){
Ok(0) => { Ok(bytes_read) => {
// FIXME: finished reading completely here? self.bytes_read += bytes_read;
},
Ok(_) => { info!("parse request read {} bytes", bytes_read);
return Err(RequestParseError::NeedMoreData);
}, },
Err(err) if err.kind() == ErrorKind::WouldBlock => { Err(err) if err.kind() == ErrorKind::WouldBlock => {
return Err(RequestParseError::NeedMoreData); return Err(RequestParseError::NeedMoreData);
@ -128,45 +128,49 @@ impl EstablishedConnection {
} }
} }
let mut headers = [httparse::EMPTY_HEADER; 1]; if self.bytes_read == 0 {
return Err(RequestParseError::NeedMoreData); // FIXME: ???
}
let mut headers = [httparse::EMPTY_HEADER; 16];
let mut request = httparse::Request::new(&mut headers); let mut request = httparse::Request::new(&mut headers);
let request = match request.parse(&self.buf){ let request = match request.parse(&self.buf[..self.bytes_read]){
Ok(httparse::Status::Complete(_)) => { Ok(httparse::Status::Complete(_)) => {
if let Some(request) = Request::from_http(request){ let result = if let Some(request) = Request::from_http(request){
Ok(request) Ok(request)
} else { } else {
Err(RequestParseError::Invalid) Err(RequestParseError::Invalid)
} };
self.bytes_read = 0;
result
}, },
Ok(httparse::Status::Partial) => { Ok(httparse::Status::Partial) => {
Err(RequestParseError::Incomplete) Err(RequestParseError::NeedMoreData)
}, },
Err(err) => { Err(err) => {
self.bytes_read = 0;
Err(RequestParseError::Parse(err)) Err(RequestParseError::Parse(err))
} }
}; };
self.buf.clear();
self.buf.shrink_to_fit();
request request
} }
pub fn send_response(&mut self, body: &str) -> Result<(), RequestParseError> { pub fn send_response(&mut self, body: &str) -> ::std::io::Result<()> {
let mut response = String::new(); let mut response = String::new();
response.push_str("200 OK\r\n\r\n"); response.push_str("HTTP/1.1 200 OK\r\n\r\n"); // FIXME: content-length
response.push_str(body); response.push_str(body);
response.push_str("\r\n");
match self.stream.write(response.as_bytes()){ self.stream.write(response.as_bytes())?;
Ok(_) => Ok(()), self.stream.flush()?;
Err(err) => {
info!("send response: {:?}", err);
Err(RequestParseError::Io(err)) Ok(())
}
}
} }
} }
@ -195,7 +199,13 @@ impl <'a>HandshakeMachine {
tls_acceptor.accept(stream) tls_acceptor.accept(stream)
) )
} else { } else {
(Some(Either::Left(EstablishedConnection::new(Stream::TcpStream(stream)))), false) log::debug!("established connection");
let established_connection = EstablishedConnection::new(
Stream::TcpStream(stream)
);
(Some(Either::Left(established_connection)), false)
} }
}, },
HandshakeMachine::TlsMidHandshake(handshake) => { HandshakeMachine::TlsMidHandshake(handshake) => {

View file

@ -182,7 +182,7 @@ pub fn run_handshake_and_read_requests<'a>(
peer_addr: established_connection.peer_addr peer_addr: established_connection.peer_addr
}; };
debug!("read message"); debug!("read request, sending to handler");
if let Err(err) = request_channel_sender if let Err(err) = request_channel_sender
.send((meta, request)) .send((meta, request))
@ -192,12 +192,16 @@ pub fn run_handshake_and_read_requests<'a>(
err err
); );
} }
break
}, },
Err(RequestParseError::NeedMoreData) => { Err(RequestParseError::NeedMoreData) => {
info!("need more data");
break; break;
}, },
Err(RequestParseError::Io(err)) => { Err(RequestParseError::Io(err)) => {
info!("error reading messages: {}", err); info!("error reading request: {}", err);
remove_connection_if_exists(connections, poll_token); remove_connection_if_exists(connections, poll_token);
@ -205,6 +209,8 @@ pub fn run_handshake_and_read_requests<'a>(
}, },
Err(e) => { Err(e) => {
info!("error reading request: {:?}", e); info!("error reading request: {:?}", e);
remove_connection_if_exists(connections, poll_token);
break; break;
}, },
@ -245,10 +251,17 @@ pub fn send_responses(
match established.send_response(&response.to_http_string()){ match established.send_response(&response.to_http_string()){
Ok(()) => { Ok(()) => {
debug!("sent message"); debug!("sent response");
remove_connection_if_exists(
connections,
meta.poll_token
);
}, },
Err(RequestParseError::NeedMoreData) => {}, // FIXME: block? Err(err) if err.kind() == ErrorKind::WouldBlock => {
Err(RequestParseError::Io(err)) => { debug!("send response: would block");
},
Err(err) => {
info!("error sending response: {}", err); info!("error sending response: {}", err);
remove_connection_if_exists( remove_connection_if_exists(
@ -256,9 +269,6 @@ pub fn send_responses(
meta.poll_token meta.poll_token
); );
}, },
_ => {
unreachable!()
}
} }
} }
} }

View file

@ -9,25 +9,25 @@ use crate::common::Peer;
// use serde_helpers::*; // use serde_helpers::*;
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
#[serde(transparent)] #[serde(transparent)]
pub struct PeerId( pub struct PeerId(
// #[serde( // #[serde(
// deserialize_with = "deserialize_20_bytes", // deserialize_with = "deserialize_20_bytes",
// serialize_with = "serialize_20_bytes" // serialize_with = "serialize_20_bytes"
// )] // )]
pub [u8; 20] pub String
); );
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
#[serde(transparent)] #[serde(transparent)]
pub struct InfoHash( pub struct InfoHash(
// #[serde( // #[serde(
// deserialize_with = "deserialize_20_bytes", // deserialize_with = "deserialize_20_bytes",
// serialize_with = "serialize_20_bytes" // serialize_with = "serialize_20_bytes"
// )] // )]
pub [u8; 20] pub String
); );
@ -76,8 +76,8 @@ pub struct AnnounceRequest {
pub bytes_left: usize, pub bytes_left: usize,
#[serde(default)] #[serde(default)]
pub event: AnnounceEvent, pub event: AnnounceEvent,
/// FIXME: number: 0 or 1 /// FIXME: number: 0 or 1 to bool
pub compact: bool, pub compact: u8,
/// Requested number of peers to return /// Requested number of peers to return
pub numwant: usize, pub numwant: usize,
} }
@ -130,16 +130,32 @@ pub enum Request {
impl Request { impl Request {
pub fn from_http(http: httparse::Request) -> Option<Self> { pub fn from_http(http: httparse::Request) -> Option<Self> {
http.path log::debug!("path: {:?}", http.path);
.and_then(|path| {
let mut iterator = path.splitn(2, '?');
iterator.next(); let path = http.path?;
iterator.next()
}) let mut split_parts= path.splitn(2, '?');
.and_then(|query_string| {
serde_urlencoded::from_str(query_string).ok() let path = split_parts.next()?;
}) let query_string = split_parts.next()?;
if path == "/announce" {
let result: Result<AnnounceRequest, serde_urlencoded::de::Error> = serde_urlencoded::from_str(query_string);
if let Err(ref err) = result {
log::debug!("error: {}", err);
}
result.ok().map(Request::Announce)
} else {
let result: Result<ScrapeRequest, serde_urlencoded::de::Error> = serde_urlencoded::from_str(query_string);
if let Err(ref err) = result {
log::debug!("error: {}", err);
}
result.ok().map(Request::Scrape)
}
} }
} }
@ -154,6 +170,6 @@ pub enum Response {
impl Response { impl Response {
pub fn to_http_string(self) -> String { pub fn to_http_string(self) -> String {
unimplemented!() "(response)".to_string()
} }
} }

View file

@ -126,122 +126,4 @@ pub fn deserialize_info_hashes<'de, D>(
where D: Deserializer<'de>, where D: Deserializer<'de>,
{ {
Ok(deserializer.deserialize_any(InfoHashVecVisitor).unwrap_or_default()) Ok(deserializer.deserialize_any(InfoHashVecVisitor).unwrap_or_default())
}
#[cfg(test)]
mod tests {
use serde::Deserialize;
use super::*;
fn info_hash_from_bytes(bytes: &[u8]) -> InfoHash {
let mut arr = [0u8; 20];
assert!(bytes.len() == 20);
arr.copy_from_slice(&bytes[..]);
InfoHash(arr)
}
#[test]
fn test_deserialize_20_bytes(){
let input = r#""aaaabbbbccccddddeeee""#;
let expected = info_hash_from_bytes(b"aaaabbbbccccddddeeee");
let observed: InfoHash = serde_json::from_str(input).unwrap();
assert_eq!(observed, expected);
let input = r#""aaaabbbbccccddddeee""#;
let res_info_hash: Result<InfoHash, _> = serde_json::from_str(input);
assert!(res_info_hash.is_err());
let input = r#""aaaabbbbccccddddeee𝕊""#;
let res_info_hash: Result<InfoHash, _> = serde_json::from_str(input);
assert!(res_info_hash.is_err());
}
#[test]
fn test_serde_20_bytes(){
let info_hash = info_hash_from_bytes(b"aaaabbbbccccddddeeee");
let out = serde_json::to_string(&info_hash).unwrap();
let info_hash_2 = serde_json::from_str(&out).unwrap();
assert_eq!(info_hash, info_hash_2);
}
#[derive(Debug, PartialEq, Eq, Deserialize)]
struct Test {
#[serde(deserialize_with = "deserialize_info_hashes", default)]
info_hashes: Vec<InfoHash>,
}
#[test]
fn test_deserialize_info_hashes_vec(){
let input = r#"{
"info_hashes": ["aaaabbbbccccddddeeee", "aaaabbbbccccddddeeee"]
}"#;
let expected = Test {
info_hashes: vec![
info_hash_from_bytes(b"aaaabbbbccccddddeeee"),
info_hash_from_bytes(b"aaaabbbbccccddddeeee"),
]
};
let observed: Test = serde_json::from_str(input).unwrap();
assert_eq!(observed, expected);
}
#[test]
fn test_deserialize_info_hashes_str(){
let input = r#"{
"info_hashes": "aaaabbbbccccddddeeee"
}"#;
let expected = Test {
info_hashes: vec![
info_hash_from_bytes(b"aaaabbbbccccddddeeee"),
]
};
let observed: Test = serde_json::from_str(input).unwrap();
assert_eq!(observed, expected);
}
#[test]
fn test_deserialize_info_hashes_null(){
let input = r#"{
"info_hashes": null
}"#;
let expected = Test {
info_hashes: vec![]
};
let observed: Test = serde_json::from_str(input).unwrap();
assert_eq!(observed, expected);
}
#[test]
fn test_deserialize_info_hashes_missing(){
let input = r#"{}"#;
let expected = Test {
info_hashes: vec![]
};
let observed: Test = serde_json::from_str(input).unwrap();
assert_eq!(observed, expected);
}
} }

5
scripts/run-aquatic-http.sh Executable file
View file

@ -0,0 +1,5 @@
#!/bin/sh
export RUSTFLAGS="-C target-cpu=native"
cargo run --release --bin aquatic_http -- $@