From 10806522822951e7a53c553ae083d794be63bc89 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 7 May 2020 01:08:29 +0200 Subject: [PATCH] WIP: start work on webtorrent support --- Cargo.lock | 376 +++++++++++++++++++++++++++++++++++++ Cargo.toml | 1 + aquatic_ws/Cargo.toml | 35 ++++ aquatic_ws/src/bin/main.rs | 216 +++++++++++++++++++++ 4 files changed, 628 insertions(+) create mode 100644 aquatic_ws/Cargo.toml create mode 100644 aquatic_ws/src/bin/main.rs diff --git a/Cargo.lock b/Cargo.lock index 8b8b62d..bced461 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -78,6 +78,29 @@ dependencies = [ "serde", ] +[[package]] +name = "aquatic_ws" +version = "0.1.0" +dependencies = [ + "bittorrent_udp", + "cli_helpers", + "crossbeam-channel", + "hashbrown", + "histogram", + "indexmap", + "mimalloc", + "mio", + "net2", + "parking_lot", + "privdrop", + "quickcheck", + "quickcheck_macros", + "rand", + "serde", + "slab", + "tungstenite", +] + [[package]] name = "arrayvec" version = "0.4.12" @@ -136,6 +159,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d" +[[package]] +name = "base64" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b41b7ea54a0c9d92199de89e20e58d49f02f8e699814ef3fdf266f6f748d15c7" + [[package]] name = "bitflags" version = "1.2.1" @@ -151,12 +180,45 @@ dependencies = [ "quickcheck_macros", ] +[[package]] +name = "block-buffer" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0940dc441f31689269e10ac70eb1002a3a1d3ad1390e030043662eb7fe4688b" +dependencies = [ + "block-padding", + "byte-tools", + "byteorder", + "generic-array", +] + +[[package]] +name = "block-padding" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa79dedbb091f449f1f39e53edf88d5dbe95f895dae6135a8d7b881fb5af73f5" +dependencies = [ + "byte-tools", +] + +[[package]] +name = "byte-tools" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3b5ca7a04898ad4bcd41c90c5285445ff5b791899bb1b0abdd2a2aa791211d7" + [[package]] name = "byteorder" version = "1.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de" +[[package]] +name = "bytes" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "130aac562c0dd69c56b3b1cc8ffd2e17be31d0b6c25b61c96b76231aa23e39e1" + [[package]] name = "cc" version = "1.0.52" @@ -214,6 +276,22 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "core-foundation" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57d24c7a13c43e870e37c1556b74555437870a04514f7685f5b354e090567171" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "core-foundation-sys" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3a71ab494c0b5b860bdc8407ae08978052417070c2ced38573a9157ad75b8ac" + [[package]] name = "crossbeam-channel" version = "0.4.2" @@ -235,6 +313,15 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "digest" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3d0c8c8752312f9713efd397ff63acb9f85585afbf179282e720e7704954dd5" +dependencies = [ + "generic-array", +] + [[package]] name = "encode_unicode" version = "0.3.6" @@ -251,6 +338,42 @@ dependencies = [ "regex", ] +[[package]] +name = "fake-simd" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" + +[[package]] +name = "fnv" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3" + +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + +[[package]] +name = "generic-array" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c68f0274ae0e023facc3c97b2e00f076be70e254bc851d972503b328db79b2ec" +dependencies = [ + "typenum", +] + [[package]] name = "getrandom" version = "0.1.14" @@ -298,12 +421,40 @@ version = "0.6.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "12cb882ccb290b8646e554b157ab0b71e64e8d5bef775cd66b6531e52d302669" +[[package]] +name = "http" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28d569972648b2c512421b5f2a405ad6ac9666547189d0c5477a3f200f3e02f9" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + +[[package]] +name = "httparse" +version = "1.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd179ae861f0c2e53da70d892f5f3029f9594be0c41dc5269cd371691b1dc2f9" + [[package]] name = "humansize" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6cab2627acfc432780848602f3f558f7e9dd427352224b0d9324025796d2a5e" +[[package]] +name = "idna" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02e2673c30ee86b5b96a9cb52ad15718aa1f966f5ab9ad54a8b95d5ca33120a9" +dependencies = [ + "matches", + "unicode-bidi", + "unicode-normalization", +] + [[package]] name = "indexmap" version = "1.3.2" @@ -325,6 +476,15 @@ dependencies = [ "regex", ] +[[package]] +name = "input_buffer" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19a8a95243d5a0398cae618ec29477c6e3cb631152be5c19481f80bc71559754" +dependencies = [ + "bytes", +] + [[package]] name = "itoa" version = "0.4.5" @@ -370,6 +530,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "matches" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08" + [[package]] name = "maybe-uninit" version = "2.0.0" @@ -415,6 +581,24 @@ dependencies = [ "winapi", ] +[[package]] +name = "native-tls" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b0d88c06fe90d5ee94048ba40409ef1d9315d86f6f38c2efdaad4fb50c58b2d" +dependencies = [ + "lazy_static", + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "net2" version = "0.2.33" @@ -557,6 +741,45 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "17b02fc0ff9a9e4b35b3342880f48e896ebf69f2967921fe8646bf5b7125956a" +[[package]] +name = "opaque-debug" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2839e79665f131bdb5782e51f2c6c9599c133c6098982a54c794358bf432529c" + +[[package]] +name = "openssl" +version = "0.10.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cee6d85f4cb4c4f59a6a85d5b68a233d280c82e29e822913b9c8b129fbf20bdd" +dependencies = [ + "bitflags", + "cfg-if", + "foreign-types", + "lazy_static", + "libc", + "openssl-sys", +] + +[[package]] +name = "openssl-probe" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77af24da69f9d9341038eba93a073b1fdaaa1b788221b00a69bce9e762cb32de" + +[[package]] +name = "openssl-sys" +version = "0.9.55" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7717097d810a0f2e2323f9e5d11e71608355e24828410b55b9d4f18aa5f9a5d8" +dependencies = [ + "autocfg", + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "parking_lot" version = "0.10.2" @@ -581,6 +804,18 @@ dependencies = [ "winapi", ] +[[package]] +name = "percent-encoding" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" + +[[package]] +name = "pkg-config" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05da548ad6865900e60eaba7f589cc0783590a92e940c26953ff81ddbab2d677" + [[package]] name = "plotly" version = "0.4.1" @@ -736,18 +971,60 @@ version = "0.6.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fe5bd57d1d7414c6b5ed48563a2c855d995ff777729dcd91c369ec7fea395ae" +[[package]] +name = "remove_dir_all" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a83fa3702a688b9359eccba92d153ac33fd2e8462f9e0e3fdf155239ea7792e" +dependencies = [ + "winapi", +] + [[package]] name = "ryu" version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed3d612bc64430efeb3f7ee6ef26d590dce0c43249217bddc62112540c7941e1" +[[package]] +name = "schannel" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "039c25b130bd8c1321ee2d7de7fde2659fa9c2744e4bb29711cfc852ea53cd19" +dependencies = [ + "lazy_static", + "winapi", +] + [[package]] name = "scopeguard" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" +[[package]] +name = "security-framework" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f331b9025654145cd425b9ded0caf8f5ae0df80d418b326e2dc1c3dc5eb0620" +dependencies = [ + "bitflags", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17bf11d99252f512695eb468de5516e5cf75455521e69dfe343f3b74e4748405" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "serde" version = "1.0.106" @@ -779,6 +1056,24 @@ dependencies = [ "serde", ] +[[package]] +name = "sha-1" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7d94d0bede923b3cea61f3f1ff57ff8cdfd77b400fb8f9998949e0cf04163df" +dependencies = [ + "block-buffer", + "digest", + "fake-simd", + "opaque-debug", +] + +[[package]] +name = "slab" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" + [[package]] name = "smallvec" version = "1.4.0" @@ -808,6 +1103,20 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "tempfile" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a6e24d9338a0a5be79593e2fa15a648add6138caa803e2d5bc782c371732ca9" +dependencies = [ + "cfg-if", + "libc", + "rand", + "redox_syscall", + "remove_dir_all", + "winapi", +] + [[package]] name = "terminal_size" version = "0.1.11" @@ -845,6 +1154,50 @@ dependencies = [ "serde", ] +[[package]] +name = "tungstenite" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfea31758bf674f990918962e8e5f07071a3161bd7c4138ed23e416e1ac4264e" +dependencies = [ + "base64", + "byteorder", + "bytes", + "http", + "httparse", + "input_buffer", + "log", + "native-tls", + "rand", + "sha-1", + "url", + "utf-8", +] + +[[package]] +name = "typenum" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "373c8a200f9e67a0c95e62a4f52fbf80c23b4381c05a17845531982fa99e6b33" + +[[package]] +name = "unicode-bidi" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49f2bd0c6468a8230e1db229cff8029217cf623c767ea5d60bfbd42729ea54d5" +dependencies = [ + "matches", +] + +[[package]] +name = "unicode-normalization" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5479532badd04e128284890390c1e876ef7a993d0570b3597ae43dfa1d59afa4" +dependencies = [ + "smallvec", +] + [[package]] name = "unicode-width" version = "0.1.7" @@ -857,6 +1210,29 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c" +[[package]] +name = "url" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "829d4a8476c35c9bf0bbce5a3b23f4106f79728039b726d292bb93bc106787cb" +dependencies = [ + "idna", + "matches", + "percent-encoding", +] + +[[package]] +name = "utf-8" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05e42f7c18b8f902290b009cde6d651262f956c98bc51bca4cd1d511c9cd85c7" + +[[package]] +name = "vcpkg" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fc439f2794e98976c88a2a2dafce96b930fe8010b0a256b3c2199a773933168" + [[package]] name = "version_check" version = "0.9.1" diff --git a/Cargo.toml b/Cargo.toml index bd7e9ce..35e70b5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "aquatic", "aquatic_bench", "aquatic_load_test", + "aquatic_ws", "bittorrent_udp", "cli_helpers", ] diff --git a/aquatic_ws/Cargo.toml b/aquatic_ws/Cargo.toml new file mode 100644 index 0000000..c7f54c6 --- /dev/null +++ b/aquatic_ws/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "aquatic_ws" +version = "0.1.0" +authors = ["Joakim FrostegÄrd "] +edition = "2018" +license = "Apache-2.0" + +# [lib] +# name = "aquatic_ws" +# path = "src/lib/lib.rs" + +[[bin]] +name = "aquatic_ws" +path = "src/bin/main.rs" + +[dependencies] +bittorrent_udp = { path = "../bittorrent_udp" } +cli_helpers = { path = "../cli_helpers" } +crossbeam-channel = "0.4" +hashbrown = "0.7" +histogram = "0.6" +indexmap = "1" +mimalloc = { version = "0.1", default-features = false } +mio = { version = "0.7", features = ["tcp", "os-poll", "os-util"] } +net2 = "0.2" +parking_lot = "0.10" +privdrop = "0.3" +rand = { version = "0.7", features = ["small_rng"] } +serde = { version = "1", features = ["derive"] } +slab = "0.4" +tungstenite = "0.10" + +[dev-dependencies] +quickcheck = "0.9" +quickcheck_macros = "0.9" \ No newline at end of file diff --git a/aquatic_ws/src/bin/main.rs b/aquatic_ws/src/bin/main.rs new file mode 100644 index 0000000..5903872 --- /dev/null +++ b/aquatic_ws/src/bin/main.rs @@ -0,0 +1,216 @@ +//! There is not much point in doing more work until more clarity on +//! exact protocol is achieved + +use std::net::SocketAddr; +use std::time::{Duration, Instant}; +use std::io::ErrorKind; +use std::option::Option; + +use slab::Slab; +use tungstenite::{Message, WebSocket}; + +use mio::{Events, Poll, Interest, Token}; +use mio::net::{TcpListener, TcpStream}; + + +pub struct PeerConnection { + pub ws: WebSocket, + pub peer_socket_addr: SocketAddr, + pub valid_until: Instant, +} + + +/// First thoughts on what to send to handler +pub struct HandlerMessage { + /// Index of socket worker that read this request. Required for sending + /// back response through correct channel to correct worker. + pub socket_worker_index: usize, + /// FIXME: Should this be parsed request? + pub message: T, + /// SocketAddr of peer + pub peer_socket_addr: SocketAddr, + /// Slab index of PeerConnection + pub peer_connection_index: usize, +} + + +fn run_network_worker(){ + let address: SocketAddr = "0.0.0.0:3000".parse().unwrap(); + + let mut listener = TcpListener::bind(address).unwrap(); + let mut poll = Poll::new().expect("create poll"); + + poll.registry() + .register(&mut listener, Token(0), Interest::READABLE) + .unwrap(); + + let mut events = Events::with_capacity(1024); + + let timeout = Duration::from_millis(50); + + let mut connections: Slab> = Slab::new(); + + // Insert empty first entry to prevent assignment of index 0 + assert_eq!(connections.insert(None), 0); + + loop { + poll.poll(&mut events, Some(timeout)) + .expect("failed polling"); + + let valid_until = Instant::now() + Duration::from_secs(600); + + for event in events.iter(){ + let token = event.token(); + + if token.0 == 0 { + loop { + match listener.accept(){ + Ok((mut stream, src)) => { + let entry = connections.vacant_entry(); + let token = Token(entry.key()); + + poll.registry() + .register(&mut stream, token, Interest::READABLE) + .unwrap(); + + // FIXME: will this cause issues due to blocking? + // Should handshake be started manually below + // instead? + let ws = tungstenite::server::accept(stream).unwrap(); + + let peer_connection = PeerConnection { + ws, + peer_socket_addr: src, + valid_until, + }; + + entry.insert(Some(peer_connection)); + }, + Err(err) => { + if err.kind() == ErrorKind::WouldBlock { + break + } + + eprint!("{}", err); + } + } + } + } else if event.is_readable(){ + loop { + if let Some(Some(connection)) = connections.get_mut(token.0){ + match connection.ws.read_message(){ + Ok(message) => { + // FIXME: parse message, send to handler + // through channel (flume?) + + connection.valid_until = valid_until; + }, + Err(tungstenite::Error::Io(err)) => { + if err.kind() == ErrorKind::WouldBlock { + break + } + + eprint!("{}", err); + }, + Err(tungstenite::Error::ConnectionClosed) => { + // FIXME: necessary? + poll.registry().deregister(connection.ws.get_mut()).unwrap(); + + connections.remove(token.0); + }, + Err(err) => { + eprint!("{}", err); + } + } + } + } + } + } + + let now = Instant::now(); + + // Close connections after some time of inactivity and write pending + // messages (which is required after closing anyway.) + // + // FIXME: peers need to be removed too, wherever they are stored + connections.retain(|_, opt_connection| { + if let Some(connection) = opt_connection { + if connection.valid_until < now { + connection.ws.close(None).unwrap(); + } + + loop { + match connection.ws.write_pending(){ + Err(tungstenite::Error::Io(err)) => { + if err.kind() == ErrorKind::WouldBlock { + break + } + }, + Err(tungstenite::Error::ConnectionClosed) => { + // FIXME: necessary? + poll.registry() + .deregister(connection.ws.get_mut()) + .unwrap(); + + return false; + }, + _ => {} + } + } + } + + true + }); + + // TODO: loop through responses from channel, write them to wss or + // possibly register ws as writable (but this means event capacity + // must be adjusted accordingy and is limiting) + + // How should IP's be handled? Send index and src to processing and + // lookup on return that entry is correct. Old ideas: + // Maybe use IndexMap and use numerical + // index for token? Removing element from IndexMap requires shifting + // or swapping indeces, so not very good. + for _ in 0..100 { + let connection_index = 1; + let peer_socket_addr: SocketAddr = "127.0.0.1:8080".parse().unwrap(); + let message = Message::Text("test".to_string()); + + let opt_connection = connections + .get_mut(connection_index); + + if let Some(Some(connection)) = opt_connection { + if connection.peer_socket_addr != peer_socket_addr { + continue; + } + + match connection.ws.write_message(message){ + Ok(()) => {}, + Err(tungstenite::Error::Io(err)) => { + if err.kind() == ErrorKind::WouldBlock { + continue; + } + + eprint!("{}", err); + }, + Err(tungstenite::Error::ConnectionClosed) => { + // FIXME: necessary? + poll.registry() + .deregister(connection.ws.get_mut()) + .unwrap(); + + connections.remove(connection_index); + }, + Err(err) => { + eprint!("{}", err); + }, + } + } + } + } +} + + +fn main(){ + run_network_worker(); +} \ No newline at end of file