diff --git a/Cargo.lock b/Cargo.lock index 51ebcce..a25ba70 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -49,6 +49,7 @@ dependencies = [ "either", "flume", "hashbrown", + "httparse", "indexmap", "log", "mimalloc", @@ -60,6 +61,7 @@ dependencies = [ "quickcheck_macros", "rand", "serde", + "serde_urlencoded", "simplelog", "socket2", ] @@ -372,6 +374,12 @@ dependencies = [ "generic-array", ] +[[package]] +name = "dtoa" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "134951f4028bdadb9b84baf4232681efbf277da25144b9b0ad65df75946c422b" + [[package]] name = "either" version = "1.5.3" @@ -1208,6 +1216,18 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_urlencoded" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ec5d77e2d4c73717816afac02670d5c4f534ea95ed430442cad02e7a6e32c97" +dependencies = [ + "dtoa", + "itoa", + "serde", + "url", +] + [[package]] name = "sha-1" version = "0.8.2" diff --git a/TODO.md b/TODO.md index 7c3b3e9..c1d0251 100644 --- a/TODO.md +++ b/TODO.md @@ -6,10 +6,10 @@ and maybe run scripts should be adjusted ## aquatic_http -* setup tls connection: support TLS and plain at the same time?? -* parse http requests incrementally when data comes in. crate for streaming - parse? -* serde for request/responses, also url encoded info hashes and peer id's +* handshake stuff + * support TLS and plain at the same time?? + * simplify +* test * move stuff to common crate with ws: what about Request/InMessage etc? ## aquatic_ws diff --git a/aquatic_http/Cargo.toml b/aquatic_http/Cargo.toml index 323dbda..5c57b97 100644 --- a/aquatic_http/Cargo.toml +++ b/aquatic_http/Cargo.toml @@ -20,6 +20,7 @@ aquatic_common = { path = "../aquatic_common" } either = "1" flume = "0.7" hashbrown = { version = "0.7", features = ["serde"] } +httparse = "1" indexmap = "1" log = "0.4" mimalloc = { version = "0.1", default-features = false } @@ -29,6 +30,7 @@ parking_lot = "0.10" privdrop = "0.3" rand = { version = "0.7", features = ["small_rng"] } serde = { version = "1", features = ["derive"] } +serde_urlencoded = "0.6" socket2 = { version = "0.3", features = ["reuseport"] } simplelog = "0.8" diff --git a/aquatic_http/src/lib/network/connection.rs b/aquatic_http/src/lib/network/connection.rs index a79c8ef..6534d33 100644 --- a/aquatic_http/src/lib/network/connection.rs +++ b/aquatic_http/src/lib/network/connection.rs @@ -1,5 +1,6 @@ use std::net::{SocketAddr}; use std::io::{Read, Write}; +use std::io::ErrorKind; use either::Either; use hashbrown::HashMap; @@ -7,12 +8,9 @@ use log::info; use mio::Token; use mio::net::TcpStream; use native_tls::{TlsAcceptor, TlsStream, MidHandshakeTlsStream}; -use tungstenite::WebSocket; -use tungstenite::handshake::{MidHandshake, HandshakeError, server::NoCallback}; -use tungstenite::server::{ServerHandshake}; -use tungstenite::protocol::WebSocketConfig; use crate::common::*; +use crate::protocol::{Request, Response}; pub enum Stream { @@ -86,15 +84,100 @@ impl Write for Stream { } -enum HandshakeMachine { - TcpStream(TcpStream), - TlsStream(TlsStream), - TlsMidHandshake(MidHandshakeTlsStream), - WsMidHandshake(MidHandshake>), +#[derive(Debug)] +pub enum RequestParseError { + NeedMoreData, + Invalid, + Incomplete, + Io(::std::io::Error), + Parse(::httparse::Error) } -impl HandshakeMachine { +pub struct EstablishedConnection { + stream: Stream, + pub peer_addr: SocketAddr, + buf: Vec, +} + + +impl EstablishedConnection { + fn new(stream: Stream) -> Self { + let peer_addr = stream.get_peer_addr(); + + Self { + stream, + peer_addr, + buf: Vec::new(), // FIXME: with capacity of like 100? + } + } + + pub fn parse_request(&mut self) -> Result { + match self.stream.read(&mut self.buf){ + Ok(0) => { + // FIXME: finished reading completely here? + }, + Ok(_) => { + return Err(RequestParseError::NeedMoreData); + }, + Err(err) if err.kind() == ErrorKind::WouldBlock => { + return Err(RequestParseError::NeedMoreData); + }, + Err(err) => { + return Err(RequestParseError::Io(err)); + } + } + + let mut headers = [httparse::EMPTY_HEADER; 1]; + let mut request = httparse::Request::new(&mut headers); + + let request = match request.parse(&self.buf){ + Ok(httparse::Status::Complete(_)) => { + if let Some(request) = Request::from_http(request){ + Ok(request) + } else { + Err(RequestParseError::Invalid) + } + }, + Ok(httparse::Status::Partial) => { + Err(RequestParseError::Incomplete) + }, + Err(err) => { + Err(RequestParseError::Parse(err)) + } + }; + + self.buf.clear(); + self.buf.shrink_to_fit(); + + request + } + + pub fn send_response(&mut self, body: &str) -> Result<(), RequestParseError> { + let mut response = String::new(); + + response.push_str("200 OK\r\n\r\n"); + response.push_str(body); + + match self.stream.write(response.as_bytes()){ + Ok(_) => Ok(()), + Err(err) => { + info!("send response: {:?}", err); + + Err(RequestParseError::Io(err)) + } + } + } +} + + +enum HandshakeMachine { + TcpStream(TcpStream), + TlsMidHandshake(MidHandshakeTlsStream), +} + + +impl <'a>HandshakeMachine { #[inline] fn new(tcp_stream: TcpStream) -> Self { Self::TcpStream(tcp_stream) @@ -103,9 +186,8 @@ impl HandshakeMachine { #[inline] fn advance( self, - ws_config: WebSocketConfig, opt_tls_acceptor: &Option, // If set, run TLS - ) -> (Option>, bool) { // bool = stop looping + ) -> (Option>, bool) { // bool = stop looping match self { HandshakeMachine::TcpStream(stream) => { if let Some(tls_acceptor) = opt_tls_acceptor { @@ -113,37 +195,22 @@ impl HandshakeMachine { tls_acceptor.accept(stream) ) } else { - let handshake_result = ::tungstenite::server::accept_with_config( - Stream::TcpStream(stream), - Some(ws_config) - ); - - Self::handle_ws_handshake_result(handshake_result) + (Some(Either::Left(EstablishedConnection::new(Stream::TcpStream(stream)))), false) } }, - HandshakeMachine::TlsStream(stream) => { - let handshake_result = ::tungstenite::server::accept( - Stream::TlsStream(stream), - ); - - Self::handle_ws_handshake_result(handshake_result) - }, HandshakeMachine::TlsMidHandshake(handshake) => { Self::handle_tls_handshake_result(handshake.handshake()) }, - HandshakeMachine::WsMidHandshake(handshake) => { - Self::handle_ws_handshake_result(handshake.handshake()) - }, } } #[inline] fn handle_tls_handshake_result( result: Result, ::native_tls::HandshakeError>, - ) -> (Option>, bool) { + ) -> (Option>, bool) { match result { Ok(stream) => { - (Some(Either::Right(Self::TlsStream(stream))), false) + (Some(Either::Left(EstablishedConnection::new(Stream::TlsStream(stream)))), false) }, Err(native_tls::HandshakeError::WouldBlock(handshake)) => { (Some(Either::Right(Self::TlsMidHandshake(handshake))), true) @@ -155,76 +222,38 @@ impl HandshakeMachine { } } } - - #[inline] - fn handle_ws_handshake_result( - result: Result, HandshakeError>> , - ) -> (Option>, bool) { - match result { - Ok(mut ws) => { - let peer_addr = ws.get_mut().get_peer_addr(); - - let established_ws = EstablishedWs { - ws, - peer_addr, - }; - - (Some(Either::Left(established_ws)), false) - }, - Err(HandshakeError::Interrupted(handshake)) => { - (Some(Either::Right(HandshakeMachine::WsMidHandshake(handshake))), true) - }, - Err(HandshakeError::Failure(err)) => { - info!("ws handshake error: {}", err); - - (None, false) - } - } - } -} - - -pub struct EstablishedWs { - pub ws: WebSocket, - pub peer_addr: SocketAddr, } pub struct Connection { - ws_config: WebSocketConfig, pub valid_until: ValidUntil, - inner: Either, + inner: Either, } /// Create from TcpStream. Run `advance_handshakes` until `get_established_ws` /// returns Some(EstablishedWs). /// -/// advance_handshakes takes ownership of self because the TLS and WebSocket -/// handshake methods do. get_established_ws doesn't, since work can be done -/// on a mutable reference to a tungstenite websocket, and this way, the whole -/// Connection doesn't have to be removed from and reinserted into the -/// TorrentMap. This is also the reason for wrapping Container.inner in an -/// Either instead of combining all states into one structure just having a -/// single method for advancing handshakes and maybe returning a websocket. +/// advance_handshakes takes ownership of self because the TLS handshake +/// methods does. get_established doesn't, since work can be done on a mutable +/// reference to a tls stream, and this way, the whole connection doesn't have +/// to be removed/inserted into the ConnectionMap impl Connection { #[inline] pub fn new( - ws_config: WebSocketConfig, valid_until: ValidUntil, tcp_stream: TcpStream, ) -> Self { Self { - ws_config, valid_until, inner: Either::Right(HandshakeMachine::new(tcp_stream)) } } #[inline] - pub fn get_established_ws<'a>(&mut self) -> Option<&mut EstablishedWs> { + pub fn get_established(&mut self) -> Option<&mut EstablishedConnection> { match self.inner { - Either::Left(ref mut ews) => Some(ews), + Either::Left(ref mut established) => Some(established), Either::Right(_) => None, } } @@ -238,15 +267,11 @@ impl Connection { match self.inner { Either::Left(_) => (Some(self), false), Either::Right(machine) => { - let ws_config = self.ws_config; - let (opt_inner, stop_loop) = machine.advance( - ws_config, opt_tls_acceptor ); let opt_new_self = opt_inner.map(|inner| Self { - ws_config, valid_until, inner }); @@ -255,24 +280,7 @@ impl Connection { } } } - - #[inline] - pub fn close(&mut self){ - if let Either::Left(ref mut ews) = self.inner { - if ews.ws.can_read(){ - ews.ws.close(None).unwrap(); - - // Required after ws.close() - if let Err(err) = ews.ws.write_pending(){ - info!( - "error writing pending messages after closing ws: {}", - err - ) - } - } - } - } } -pub type ConnectionMap = HashMap; \ No newline at end of file +pub type ConnectionMap<'a> = HashMap; \ No newline at end of file diff --git a/aquatic_http/src/lib/network/mod.rs b/aquatic_http/src/lib/network/mod.rs index 7479092..417ef36 100644 --- a/aquatic_http/src/lib/network/mod.rs +++ b/aquatic_http/src/lib/network/mod.rs @@ -6,7 +6,6 @@ use log::{info, debug, error}; use native_tls::TlsAcceptor; use mio::{Events, Poll, Interest, Token}; use mio::net::TcpListener; -use tungstenite::protocol::WebSocketConfig; use crate::common::*; use crate::config::Config; @@ -24,8 +23,8 @@ pub fn run_socket_worker( config: Config, socket_worker_index: usize, socket_worker_statuses: SocketWorkerStatuses, - request_channel_sender: InMessageSender, - response_channel_receiver: OutMessageReceiver, + request_channel_sender: RequestChannelSender, + response_channel_receiver: ResponseChannelReceiver, opt_tls_acceptor: Option, ){ match create_listener(&config){ @@ -54,19 +53,14 @@ pub fn run_socket_worker( pub fn run_poll_loop( config: Config, socket_worker_index: usize, - request_channel_sender: InMessageSender, - response_channel_receiver: OutMessageReceiver, + request_channel_sender: RequestChannelSender, + response_channel_receiver: ResponseChannelReceiver, listener: ::std::net::TcpListener, opt_tls_acceptor: Option, ){ let poll_timeout = Duration::from_millis( config.network.poll_timeout_milliseconds ); - let ws_config = WebSocketConfig { - max_message_size: Some(config.network.websocket_max_message_size), - max_frame_size: Some(config.network.websocket_max_frame_size), - max_send_queue: None, - }; let mut listener = TcpListener::from_std(listener); let mut poll = Poll::new().expect("create poll"); @@ -92,7 +86,6 @@ pub fn run_poll_loop( if token.0 == 0 { accept_new_streams( - ws_config, &mut listener, &mut poll, &mut connections, @@ -111,7 +104,7 @@ pub fn run_poll_loop( } } - send_out_messages( + send_responses( response_channel_receiver.drain(), &mut connections ); @@ -128,7 +121,6 @@ pub fn run_poll_loop( // will be identical to ws version fn accept_new_streams( - ws_config: WebSocketConfig, listener: &mut TcpListener, poll: &mut Poll, connections: &mut ConnectionMap, @@ -152,7 +144,7 @@ fn accept_new_streams( .register(&mut stream, token, Interest::READABLE) .unwrap(); - let connection = Connection::new(ws_config, valid_until, stream); + let connection = Connection::new(valid_until, stream); connections.insert(token, connection); }, @@ -170,56 +162,52 @@ fn accept_new_streams( /// On the stream given by poll_token, get TLS (if requested) and tungstenite /// up and running, then read messages and pass on through channel. -pub fn run_handshake_and_read_requests( +pub fn run_handshake_and_read_requests<'a>( socket_worker_index: usize, - request_channel_sender: &InMessageSender, - opt_tls_acceptor: &Option, // If set, run TLS - connections: &mut ConnectionMap, + request_channel_sender: &RequestChannelSender, + opt_tls_acceptor: &'a Option, // If set, run TLS + connections: &'a mut ConnectionMap<'a>, poll_token: Token, valid_until: ValidUntil, ){ loop { - if let Some(established_ws) = connections.get_mut(&poll_token) - .and_then(Connection::get_established_ws) + if let Some(established_connection) = connections.get_mut(&poll_token) + .and_then(Connection::get_established) { - use ::tungstenite::Error::Io; + match established_connection.parse_request(){ + Ok(request) => { + let meta = ConnectionMeta { + worker_index: socket_worker_index, + poll_token, + peer_addr: established_connection.peer_addr + }; - match established_ws.ws.read_message(){ - Ok(ws_message) => { - if let Some(in_message) = InMessage::from_ws_message(ws_message){ - let meta = ConnectionMeta { - worker_index: socket_worker_index, - poll_token, - peer_addr: established_ws.peer_addr - }; + debug!("read message"); - debug!("read message"); - - if let Err(err) = request_channel_sender - .send((meta, in_message)) - { - error!( - "InMessageSender: couldn't send message: {:?}", - err - ); - } + if let Err(err) = request_channel_sender + .send((meta, request)) + { + error!( + "RequestChannelSender: couldn't send message: {:?}", + err + ); } }, - Err(Io(err)) if err.kind() == ErrorKind::WouldBlock => { + Err(RequestParseError::NeedMoreData) => { break; }, - Err(tungstenite::Error::ConnectionClosed) => { - remove_connection_if_exists(connections, poll_token); - - break - }, - Err(err) => { + Err(RequestParseError::Io(err)) => { info!("error reading messages: {}", err); remove_connection_if_exists(connections, poll_token); break; - } + }, + Err(e) => { + info!("error reading request: {:?}", e); + + break; + }, } } else if let Some(connection) = connections.remove(&poll_token){ let (opt_new_connection, stop_loop) = connection.advance_handshakes( @@ -240,39 +228,37 @@ pub fn run_handshake_and_read_requests( /// Read messages from channel, send to peers -pub fn send_out_messages( - response_channel_receiver: ::flume::Drain<(ConnectionMeta, OutMessage)>, +pub fn send_responses( + response_channel_receiver: ::flume::Drain<(ConnectionMeta, Response)>, connections: &mut ConnectionMap, ){ - for (meta, out_message) in response_channel_receiver { - let opt_established_ws = connections.get_mut(&meta.poll_token) - .and_then(Connection::get_established_ws); + for (meta, response) in response_channel_receiver { + let opt_established = connections.get_mut(&meta.poll_token) + .and_then(Connection::get_established); - if let Some(established_ws) = opt_established_ws { - if established_ws.peer_addr != meta.peer_addr { + if let Some(established) = opt_established { + if established.peer_addr != meta.peer_addr { info!("socket worker error: peer socket addrs didn't match"); continue; } - - use ::tungstenite::Error::Io; - match established_ws.ws.write_message(out_message.to_ws_message()){ + match established.send_response(&response.to_http_string()){ Ok(()) => { debug!("sent message"); }, - Err(Io(err)) if err.kind() == ErrorKind::WouldBlock => {}, - Err(tungstenite::Error::ConnectionClosed) => { - remove_connection_if_exists(connections, meta.poll_token); - }, - Err(err) => { - info!("error writing ws message: {}", err); + Err(RequestParseError::NeedMoreData) => {}, // FIXME: block? + Err(RequestParseError::Io(err)) => { + info!("error sending response: {}", err); remove_connection_if_exists( connections, meta.poll_token ); }, + _ => { + unreachable!() + } } } } diff --git a/aquatic_http/src/lib/network/utils.rs b/aquatic_http/src/lib/network/utils.rs index b10112d..a99e8db 100644 --- a/aquatic_http/src/lib/network/utils.rs +++ b/aquatic_http/src/lib/network/utils.rs @@ -48,7 +48,7 @@ pub fn remove_connection_if_exists( token: Token, ){ if let Some(mut connection) = connections.remove(&token){ - connection.close(); + // connection.close(); // FIXME } } @@ -62,7 +62,7 @@ pub fn remove_inactive_connections( connections.retain(|_, connection| { if connection.valid_until.0 < now { - connection.close(); + // connection.close(); // FIXME false } else { diff --git a/aquatic_http/src/lib/protocol/mod.rs b/aquatic_http/src/lib/protocol/mod.rs index bf61d6e..ed02800 100644 --- a/aquatic_http/src/lib/protocol/mod.rs +++ b/aquatic_http/src/lib/protocol/mod.rs @@ -4,18 +4,18 @@ use serde::{Serialize, Deserialize}; use crate::common::Peer; -mod serde_helpers; +// mod serde_helpers; -use serde_helpers::*; +// use serde_helpers::*; #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Serialize, Deserialize)] #[serde(transparent)] pub struct PeerId( - #[serde( - deserialize_with = "deserialize_20_bytes", - serialize_with = "serialize_20_bytes" - )] + // #[serde( + // deserialize_with = "deserialize_20_bytes", + // serialize_with = "serialize_20_bytes" + // )] pub [u8; 20] ); @@ -23,10 +23,10 @@ pub struct PeerId( #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Serialize, Deserialize)] #[serde(transparent)] pub struct InfoHash( - #[serde( - deserialize_with = "deserialize_20_bytes", - serialize_with = "serialize_20_bytes" - )] + // #[serde( + // deserialize_with = "deserialize_20_bytes", + // serialize_with = "serialize_20_bytes" + // )] pub [u8; 20] ); @@ -102,11 +102,7 @@ pub struct AnnounceResponseFailure { #[derive(Debug, Clone, Deserialize)] pub struct ScrapeRequest { - #[serde( - rename = "info_hash", - deserialize_with = "deserialize_info_hashes", - default - )] + #[serde(rename = "info_hash")] pub info_hashes: Vec, } @@ -133,8 +129,17 @@ pub enum Request { impl Request { - pub fn from_http() -> Self { - unimplemented!() + pub fn from_http(http: httparse::Request) -> Option { + http.path + .and_then(|path| { + let mut iterator = path.splitn(2, '?'); + + iterator.next(); + iterator.next() + }) + .and_then(|query_string| { + serde_urlencoded::from_str(query_string).ok() + }) } } @@ -144,4 +149,11 @@ pub enum Response { AnnounceSuccess(AnnounceResponseSuccess), AnnounceFailure(AnnounceResponseFailure), Scrape(ScrapeResponse) +} + + +impl Response { + pub fn to_http_string(self) -> String { + unimplemented!() + } } \ No newline at end of file