diff --git a/Cargo.lock b/Cargo.lock index 04171bb..3f16950 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -54,6 +54,16 @@ dependencies = [ "rand", ] +[[package]] +name = "aquatic_common_tcp" +version = "0.1.0" +dependencies = [ + "anyhow", + "aquatic_common", + "mio", + "native-tls", +] + [[package]] name = "aquatic_http" version = "0.1.0" @@ -61,6 +71,7 @@ dependencies = [ "anyhow", "aquatic_cli_helpers", "aquatic_common", + "aquatic_common_tcp", "bendy", "either", "flume", diff --git a/Cargo.toml b/Cargo.toml index bf89258..32107a3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ members = [ "aquatic_cli_helpers", "aquatic_common", + "aquatic_common_tcp", "aquatic_http", "aquatic_udp", "aquatic_udp_bench", diff --git a/TODO.md b/TODO.md index 62725f4..589c87a 100644 --- a/TODO.md +++ b/TODO.md @@ -6,6 +6,7 @@ and maybe run scripts should be adjusted ## aquatic_http +* move stuff to common crate with ws: what about Request/InMessage etc? * handshake stuff * fix overcomplicated and probably incorrect implementation * support TLS and plain at the same time?? @@ -13,7 +14,6 @@ * fixed size buffer is probably bad * compact peer representation in announce response: is implementation correct? * scrape info hash parsing: multiple ought to be accepted -* move stuff to common crate with ws: what about Request/InMessage etc? * info hashes, peer ids: check that whole deserialization and url decoding works as it should. There are suspicously many `\u{fffd}` diff --git a/aquatic_common_tcp/Cargo.toml b/aquatic_common_tcp/Cargo.toml new file mode 100644 index 0000000..f9d3d0d --- /dev/null +++ b/aquatic_common_tcp/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "aquatic_common_tcp" +version = "0.1.0" +authors = ["Joakim FrostegÄrd "] +edition = "2018" +license = "Apache-2.0" + +[lib] +name = "aquatic_common_tcp" + +[dependencies] +anyhow = "1" +aquatic_common = { path = "../aquatic_common" } +mio = { version = "0.7", features = ["tcp", "os-poll", "os-util"] } +native-tls = "0.2" \ No newline at end of file diff --git a/aquatic_common_tcp/src/config.rs b/aquatic_common_tcp/src/config.rs new file mode 100644 index 0000000..e69de29 diff --git a/aquatic_common_tcp/src/lib.rs b/aquatic_common_tcp/src/lib.rs new file mode 100644 index 0000000..b0e8d58 --- /dev/null +++ b/aquatic_common_tcp/src/lib.rs @@ -0,0 +1,2 @@ +pub mod config; +pub mod network; \ No newline at end of file diff --git a/aquatic_common_tcp/src/network/mod.rs b/aquatic_common_tcp/src/network/mod.rs new file mode 100644 index 0000000..bb01fe9 --- /dev/null +++ b/aquatic_common_tcp/src/network/mod.rs @@ -0,0 +1 @@ +pub mod stream; \ No newline at end of file diff --git a/aquatic_common_tcp/src/network/stream.rs b/aquatic_common_tcp/src/network/stream.rs new file mode 100644 index 0000000..a094115 --- /dev/null +++ b/aquatic_common_tcp/src/network/stream.rs @@ -0,0 +1,76 @@ +use std::net::{SocketAddr}; +use std::io::{Read, Write}; + +use mio::net::TcpStream; +use native_tls::TlsStream; + + +pub enum Stream { + TcpStream(TcpStream), + TlsStream(TlsStream), +} + + +impl Stream { + #[inline] + pub fn get_peer_addr(&self) -> SocketAddr { + match self { + Self::TcpStream(stream) => stream.peer_addr().unwrap(), + Self::TlsStream(stream) => stream.get_ref().peer_addr().unwrap(), + } + } +} + + +impl Read for Stream { + #[inline] + fn read(&mut self, buf: &mut [u8]) -> Result { + match self { + Self::TcpStream(stream) => stream.read(buf), + Self::TlsStream(stream) => stream.read(buf), + } + } + + /// Not used but provided for completeness + #[inline] + fn read_vectored( + &mut self, + bufs: &mut [::std::io::IoSliceMut<'_>] + ) -> ::std::io::Result { + match self { + Self::TcpStream(stream) => stream.read_vectored(bufs), + Self::TlsStream(stream) => stream.read_vectored(bufs), + } + } +} + + +impl Write for Stream { + #[inline] + fn write(&mut self, buf: &[u8]) -> ::std::io::Result { + match self { + Self::TcpStream(stream) => stream.write(buf), + Self::TlsStream(stream) => stream.write(buf), + } + } + + /// Not used but provided for completeness + #[inline] + fn write_vectored( + &mut self, + bufs: &[::std::io::IoSlice<'_>] + ) -> ::std::io::Result { + match self { + Self::TcpStream(stream) => stream.write_vectored(bufs), + Self::TlsStream(stream) => stream.write_vectored(bufs), + } + } + + #[inline] + fn flush(&mut self) -> ::std::io::Result<()> { + match self { + Self::TcpStream(stream) => stream.flush(), + Self::TlsStream(stream) => stream.flush(), + } + } +} \ No newline at end of file diff --git a/aquatic_http/Cargo.toml b/aquatic_http/Cargo.toml index c8f2e13..aa44523 100644 --- a/aquatic_http/Cargo.toml +++ b/aquatic_http/Cargo.toml @@ -17,6 +17,7 @@ path = "src/bin/main.rs" anyhow = "1" aquatic_cli_helpers = { path = "../aquatic_cli_helpers" } aquatic_common = { path = "../aquatic_common" } +aquatic_common_tcp = { path = "../aquatic_common_tcp" } bendy = { version = "0.3", features = ["std", "serde"] } either = "1" flume = "0.7" diff --git a/aquatic_http/src/lib/common.rs b/aquatic_http/src/lib/common.rs index 3ac09a4..cb91c2b 100644 --- a/aquatic_http/src/lib/common.rs +++ b/aquatic_http/src/lib/common.rs @@ -1,4 +1,4 @@ -use std::net::{SocketAddr, IpAddr}; +use std::net::SocketAddr; use std::sync::Arc; use flume::{Sender, Receiver}; @@ -34,7 +34,7 @@ pub enum PeerStatus { } -// identical to ws version - FIXME only if bytes left is optional +// almost identical to ws version - FIXME only if bytes left is optional impl PeerStatus { /// Determine peer status from announce event and number of bytes left. /// diff --git a/aquatic_http/src/lib/network/connection.rs b/aquatic_http/src/lib/network/connection.rs index 852044f..03def2c 100644 --- a/aquatic_http/src/lib/network/connection.rs +++ b/aquatic_http/src/lib/network/connection.rs @@ -1,6 +1,6 @@ use std::net::{SocketAddr}; -use std::io::{Read, Write}; use std::io::ErrorKind; +use std::io::{Read, Write}; use either::Either; use hashbrown::HashMap; @@ -9,81 +9,12 @@ use mio::Token; use mio::net::TcpStream; use native_tls::{TlsAcceptor, TlsStream, MidHandshakeTlsStream}; +use aquatic_common_tcp::network::stream::Stream; + use crate::common::*; use crate::protocol::{Request, Response}; -pub enum Stream { - TcpStream(TcpStream), - TlsStream(TlsStream), -} - - -impl Stream { - #[inline] - pub fn get_peer_addr(&self) -> SocketAddr { - match self { - Self::TcpStream(stream) => stream.peer_addr().unwrap(), - Self::TlsStream(stream) => stream.get_ref().peer_addr().unwrap(), - } - } -} - - -impl Read for Stream { - #[inline] - fn read(&mut self, buf: &mut [u8]) -> Result { - match self { - Self::TcpStream(stream) => stream.read(buf), - Self::TlsStream(stream) => stream.read(buf), - } - } - - /// Not used but provided for completeness - #[inline] - fn read_vectored( - &mut self, - bufs: &mut [::std::io::IoSliceMut<'_>] - ) -> ::std::io::Result { - match self { - Self::TcpStream(stream) => stream.read_vectored(bufs), - Self::TlsStream(stream) => stream.read_vectored(bufs), - } - } -} - - -impl Write for Stream { - #[inline] - fn write(&mut self, buf: &[u8]) -> ::std::io::Result { - match self { - Self::TcpStream(stream) => stream.write(buf), - Self::TlsStream(stream) => stream.write(buf), - } - } - - /// Not used but provided for completeness - #[inline] - fn write_vectored( - &mut self, - bufs: &[::std::io::IoSlice<'_>] - ) -> ::std::io::Result { - match self { - Self::TcpStream(stream) => stream.write_vectored(bufs), - Self::TlsStream(stream) => stream.write_vectored(bufs), - } - } - - #[inline] - fn flush(&mut self) -> ::std::io::Result<()> { - match self { - Self::TcpStream(stream) => stream.flush(), - Self::TlsStream(stream) => stream.flush(), - } - } -} - - #[derive(Debug)] pub enum RequestParseError { NeedMoreData, @@ -296,4 +227,4 @@ impl Connection { } -pub type ConnectionMap<'a> = HashMap; \ No newline at end of file +pub type ConnectionMap = HashMap; \ No newline at end of file diff --git a/aquatic_http/src/lib/network/mod.rs b/aquatic_http/src/lib/network/mod.rs index 0c54503..3506ea7 100644 --- a/aquatic_http/src/lib/network/mod.rs +++ b/aquatic_http/src/lib/network/mod.rs @@ -1,3 +1,6 @@ +pub mod connection; +pub mod utils; + use std::time::Duration; use std::io::ErrorKind; @@ -11,13 +14,50 @@ use crate::common::*; use crate::config::Config; use crate::protocol::*; -pub mod connection; -pub mod utils; - use connection::*; use utils::*; + +fn accept_new_streams( + listener: &mut TcpListener, + poll: &mut Poll, + connections: &mut ConnectionMap, + valid_until: ValidUntil, + poll_token_counter: &mut Token, +){ + loop { + match listener.accept(){ + Ok((mut stream, _)) => { + poll_token_counter.0 = poll_token_counter.0.wrapping_add(1); + + if poll_token_counter.0 == 0 { + poll_token_counter.0 = 1; + } + + let token = *poll_token_counter; + + remove_connection_if_exists(connections, token); + + poll.registry() + .register(&mut stream, token, Interest::READABLE) + .unwrap(); + + let connection = Connection::new(valid_until, stream); + + connections.insert(token, connection); + }, + Err(err) => { + if err.kind() == ErrorKind::WouldBlock { + break + } + + info!("error while accepting streams: {}", err); + } + } + } +} + // will be almost identical to ws version pub fn run_socket_worker( config: Config, @@ -119,54 +159,14 @@ pub fn run_poll_loop( } -// will be identical to ws version -fn accept_new_streams( - listener: &mut TcpListener, - poll: &mut Poll, - connections: &mut ConnectionMap, - valid_until: ValidUntil, - poll_token_counter: &mut Token, -){ - loop { - match listener.accept(){ - Ok((mut stream, _)) => { - poll_token_counter.0 = poll_token_counter.0.wrapping_add(1); - - if poll_token_counter.0 == 0 { - poll_token_counter.0 = 1; - } - - let token = *poll_token_counter; - - remove_connection_if_exists(connections, token); - - poll.registry() - .register(&mut stream, token, Interest::READABLE) - .unwrap(); - - let connection = Connection::new(valid_until, stream); - - connections.insert(token, connection); - }, - Err(err) => { - if err.kind() == ErrorKind::WouldBlock { - break - } - - info!("error while accepting streams: {}", err); - } - } - } -} - /// On the stream given by poll_token, get TLS (if requested) and tungstenite /// up and running, then read messages and pass on through channel. -pub fn run_handshake_and_read_requests<'a>( +pub fn run_handshake_and_read_requests( socket_worker_index: usize, request_channel_sender: &RequestChannelSender, - opt_tls_acceptor: &'a Option, // If set, run TLS - connections: &'a mut ConnectionMap<'a>, + opt_tls_acceptor: &Option, // If set, run TLS + connections: &mut ConnectionMap, poll_token: Token, valid_until: ValidUntil, ){ diff --git a/aquatic_http/src/lib/network/utils.rs b/aquatic_http/src/lib/network/utils.rs index a99e8db..08e57a2 100644 --- a/aquatic_http/src/lib/network/utils.rs +++ b/aquatic_http/src/lib/network/utils.rs @@ -6,10 +6,10 @@ use socket2::{Socket, Domain, Type, Protocol}; use crate::config::Config; -use super::connection::*; +use super::*; -// will be identical to ws version +// will be almost identical to ws version pub fn create_listener( config: &Config ) -> ::anyhow::Result<::std::net::TcpListener> { @@ -38,7 +38,6 @@ pub fn create_listener( } -// will be identical to ws version /// Don't bother with deregistering from Poll. In my understanding, this is /// done automatically when the stream is dropped, as long as there are no /// other references to the file descriptor, such as when it is accessed @@ -53,7 +52,6 @@ pub fn remove_connection_if_exists( } -// will be identical to ws version // Close and remove inactive connections pub fn remove_inactive_connections( connections: &mut ConnectionMap, diff --git a/aquatic_http/src/lib/protocol/mod.rs b/aquatic_http/src/lib/protocol/mod.rs index 66f7252..7deaa86 100644 --- a/aquatic_http/src/lib/protocol/mod.rs +++ b/aquatic_http/src/lib/protocol/mod.rs @@ -1,6 +1,6 @@ use std::net::IpAddr; use hashbrown::HashMap; -use serde::{Serialize, Deserialize, Serializer}; +use serde::{Serialize, Deserialize}; use crate::common::Peer;