mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-04-02 18:55:32 +00:00
WIP: aquatic_ws: start work on data structures, data flow
This commit is contained in:
parent
0d835452c1
commit
cf75a07a7e
8 changed files with 503 additions and 222 deletions
84
Cargo.lock
generated
84
Cargo.lock
generated
|
|
@ -84,7 +84,7 @@ version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bittorrent_udp",
|
"bittorrent_udp",
|
||||||
"cli_helpers",
|
"cli_helpers",
|
||||||
"crossbeam-channel",
|
"flume",
|
||||||
"hashbrown",
|
"hashbrown",
|
||||||
"histogram",
|
"histogram",
|
||||||
"indexmap",
|
"indexmap",
|
||||||
|
|
@ -344,6 +344,16 @@ version = "0.1.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed"
|
checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "flume"
|
||||||
|
version = "0.7.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "855e285c3835897065a6ba6f9463b44553eb9f29c7988d692f3d41283b47388b"
|
||||||
|
dependencies = [
|
||||||
|
"futures",
|
||||||
|
"spin",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "fnv"
|
name = "fnv"
|
||||||
version = "1.0.6"
|
version = "1.0.6"
|
||||||
|
|
@ -365,6 +375,66 @@ version = "0.1.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"
|
checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "futures"
|
||||||
|
version = "0.3.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "5c329ae8753502fb44ae4fc2b622fa2a94652c41e795143765ba0927f92ab780"
|
||||||
|
dependencies = [
|
||||||
|
"futures-channel",
|
||||||
|
"futures-core",
|
||||||
|
"futures-io",
|
||||||
|
"futures-sink",
|
||||||
|
"futures-task",
|
||||||
|
"futures-util",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "futures-channel"
|
||||||
|
version = "0.3.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f0c77d04ce8edd9cb903932b608268b3fffec4163dc053b3b402bf47eac1f1a8"
|
||||||
|
dependencies = [
|
||||||
|
"futures-core",
|
||||||
|
"futures-sink",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "futures-core"
|
||||||
|
version = "0.3.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f25592f769825e89b92358db00d26f965761e094951ac44d3663ef25b7ac464a"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "futures-io"
|
||||||
|
version = "0.3.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "a638959aa96152c7a4cddf50fcb1e3fede0583b27157c26e67d6f99904090dc6"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "futures-sink"
|
||||||
|
version = "0.3.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "3466821b4bc114d95b087b850a724c6f83115e929bc88f1fa98a3304a944c8a6"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "futures-task"
|
||||||
|
version = "0.3.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "7b0a34e53cf6cdcd0178aa573aed466b646eb3db769570841fda0c7ede375a27"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "futures-util"
|
||||||
|
version = "0.3.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "22766cf25d64306bedf0384da004d05c9974ab104fcc4528f1236181c18004c5"
|
||||||
|
dependencies = [
|
||||||
|
"futures-core",
|
||||||
|
"futures-sink",
|
||||||
|
"futures-task",
|
||||||
|
"pin-utils",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "generic-array"
|
name = "generic-array"
|
||||||
version = "0.12.3"
|
version = "0.12.3"
|
||||||
|
|
@ -810,6 +880,12 @@ version = "2.1.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
|
checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "pin-utils"
|
||||||
|
version = "0.1.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "pkg-config"
|
name = "pkg-config"
|
||||||
version = "0.3.17"
|
version = "0.3.17"
|
||||||
|
|
@ -1092,6 +1168,12 @@ dependencies = [
|
||||||
"winapi",
|
"winapi",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "spin"
|
||||||
|
version = "0.5.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "syn"
|
name = "syn"
|
||||||
version = "1.0.18"
|
version = "1.0.18"
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,7 @@ path = "src/bin/main.rs"
|
||||||
[dependencies]
|
[dependencies]
|
||||||
bittorrent_udp = { path = "../bittorrent_udp" }
|
bittorrent_udp = { path = "../bittorrent_udp" }
|
||||||
cli_helpers = { path = "../cli_helpers" }
|
cli_helpers = { path = "../cli_helpers" }
|
||||||
crossbeam-channel = "0.4"
|
flume = "0.7"
|
||||||
hashbrown = "0.7"
|
hashbrown = "0.7"
|
||||||
histogram = "0.6"
|
histogram = "0.6"
|
||||||
indexmap = "1"
|
indexmap = "1"
|
||||||
|
|
|
||||||
|
|
@ -2,5 +2,5 @@ use aquatic_ws;
|
||||||
|
|
||||||
|
|
||||||
fn main(){
|
fn main(){
|
||||||
aquatic_ws::run_network_worker();
|
aquatic_ws::run();
|
||||||
}
|
}
|
||||||
83
aquatic_ws/src/lib/common.rs
Normal file
83
aquatic_ws/src/lib/common.rs
Normal file
|
|
@ -0,0 +1,83 @@
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
use std::time::Instant;
|
||||||
|
|
||||||
|
use flume::{Sender, Receiver};
|
||||||
|
use hashbrown::HashMap;
|
||||||
|
use indexmap::IndexMap;
|
||||||
|
|
||||||
|
use crate::protocol::*;
|
||||||
|
|
||||||
|
|
||||||
|
pub struct ValidUntil(pub Instant);
|
||||||
|
|
||||||
|
|
||||||
|
pub struct Peer {
|
||||||
|
pub peer_id: PeerId,
|
||||||
|
pub complete: bool,
|
||||||
|
pub valid_until: ValidUntil,
|
||||||
|
|
||||||
|
// FIXME: these three could probably be replaced with MessageMeta
|
||||||
|
pub socket_worker_index: usize,
|
||||||
|
pub socket_addr: SocketAddr,
|
||||||
|
pub connection_index: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pub type PeerMap = IndexMap<PeerId, Peer>;
|
||||||
|
|
||||||
|
|
||||||
|
pub struct TorrentData {
|
||||||
|
pub peers: PeerMap,
|
||||||
|
pub seeders: usize,
|
||||||
|
pub leechers: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pub type TorrentMap = HashMap<InfoHash, TorrentData>;
|
||||||
|
|
||||||
|
|
||||||
|
pub struct State {
|
||||||
|
pub torrents: TorrentMap,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
impl Default for State {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
torrents: HashMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pub struct MessageMeta {
|
||||||
|
/// Index of socket worker that read this request. Required for sending
|
||||||
|
/// back response through correct channel to correct worker.
|
||||||
|
pub socket_worker_index: usize,
|
||||||
|
/// SocketAddr of peer
|
||||||
|
pub peer_socket_addr: SocketAddr,
|
||||||
|
/// Slab index of PeerConnection
|
||||||
|
pub peer_connection_index: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pub type InMessageSender = Sender<(MessageMeta, InMessage)>;
|
||||||
|
pub type InMessageReceiver = Receiver<(MessageMeta, InMessage)>;
|
||||||
|
pub type OutMessageReceiver = Receiver<(MessageMeta, OutMessage)>;
|
||||||
|
|
||||||
|
|
||||||
|
pub struct OutMessageSender(Vec<Sender<(MessageMeta, OutMessage)>>);
|
||||||
|
|
||||||
|
|
||||||
|
impl OutMessageSender {
|
||||||
|
pub fn new(senders: Vec<Sender<(MessageMeta, OutMessage)>>) -> Self {
|
||||||
|
Self(senders)
|
||||||
|
}
|
||||||
|
pub fn send(
|
||||||
|
&self,
|
||||||
|
meta: MessageMeta,
|
||||||
|
message: OutMessage
|
||||||
|
){
|
||||||
|
self.0[meta.socket_worker_index].send((meta, message));
|
||||||
|
}
|
||||||
|
}
|
||||||
47
aquatic_ws/src/lib/handler.rs
Normal file
47
aquatic_ws/src/lib/handler.rs
Normal file
|
|
@ -0,0 +1,47 @@
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use hashbrown::HashMap;
|
||||||
|
|
||||||
|
use crate::common::*;
|
||||||
|
use crate::protocol::*;
|
||||||
|
|
||||||
|
|
||||||
|
pub fn run_request_worker(
|
||||||
|
state: State,
|
||||||
|
in_message_receiver: InMessageReceiver,
|
||||||
|
out_message_sender: OutMessageSender,
|
||||||
|
){
|
||||||
|
let mut in_messages = Vec::new();
|
||||||
|
let mut out_messages = Vec::new();
|
||||||
|
|
||||||
|
let timeout = Duration::from_micros(200);
|
||||||
|
|
||||||
|
for i in 0..1000 {
|
||||||
|
if i == 0 {
|
||||||
|
if let Ok((meta, in_message)) = in_message_receiver.recv(){
|
||||||
|
in_messages.push((meta, in_message));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
let res_in_message = in_message_receiver.recv_timeout(timeout);
|
||||||
|
|
||||||
|
if let Ok((meta, in_message)) = res_in_message {
|
||||||
|
in_messages.push((meta, in_message));
|
||||||
|
} else {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
for (meta, in_message) in in_messages.drain(..){
|
||||||
|
let out_message = OutMessage::ScrapeResponse(ScrapeResponse {
|
||||||
|
files: HashMap::new(),
|
||||||
|
flags: HashMap::new(),
|
||||||
|
});
|
||||||
|
|
||||||
|
out_messages.push((meta, out_message));
|
||||||
|
}
|
||||||
|
|
||||||
|
for (meta, out_message) in out_messages.drain(..){
|
||||||
|
out_message_sender.send(meta, out_message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -1,213 +1,48 @@
|
||||||
//! There is not much point in doing more work until more clarity on
|
//! There is not much point in doing more work until more clarity on
|
||||||
//! exact protocol is achieved
|
//! exact protocol is achieved
|
||||||
|
|
||||||
use std::net::{SocketAddr};
|
pub mod common;
|
||||||
use std::time::{Duration, Instant};
|
pub mod handler;
|
||||||
use std::io::ErrorKind;
|
pub mod network;
|
||||||
use std::option::Option;
|
|
||||||
|
|
||||||
use slab::Slab;
|
|
||||||
use tungstenite::{Message, WebSocket};
|
|
||||||
|
|
||||||
use mio::{Events, Poll, Interest, Token};
|
|
||||||
use mio::net::{TcpListener, TcpStream};
|
|
||||||
|
|
||||||
pub mod protocol;
|
pub mod protocol;
|
||||||
|
|
||||||
|
use common::*;
|
||||||
|
|
||||||
pub struct PeerConnection {
|
|
||||||
pub ws: WebSocket<TcpStream>,
|
pub fn run(){
|
||||||
pub peer_socket_addr: SocketAddr,
|
let state = State::default();
|
||||||
pub valid_until: Instant,
|
|
||||||
|
let (in_message_sender, in_message_receiver): (InMessageSender, InMessageReceiver) = ::flume::unbounded();
|
||||||
|
|
||||||
|
let mut out_message_senders = Vec::new();
|
||||||
|
|
||||||
|
for i in 0..2 {
|
||||||
|
let in_message_sender = in_message_sender.clone();
|
||||||
|
|
||||||
|
let (out_message_sender, out_message_receiver) = ::flume::unbounded();
|
||||||
|
|
||||||
|
out_message_senders.push(out_message_sender);
|
||||||
|
|
||||||
|
::std::thread::spawn(move || {
|
||||||
|
network::run_socket_worker(
|
||||||
|
i,
|
||||||
|
in_message_sender,
|
||||||
|
out_message_receiver,
|
||||||
|
);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let out_message_sender = OutMessageSender::new(out_message_senders);
|
||||||
|
|
||||||
/// First thoughts on what to send to handler
|
::std::thread::spawn(move || {
|
||||||
pub struct HandlerMessage<T> {
|
handler::run_request_worker(
|
||||||
/// Index of socket worker that read this request. Required for sending
|
state,
|
||||||
/// back response through correct channel to correct worker.
|
in_message_receiver,
|
||||||
pub socket_worker_index: usize,
|
out_message_sender,
|
||||||
/// FIXME: Should this be parsed request?
|
);
|
||||||
pub message: T,
|
|
||||||
/// SocketAddr of peer
|
|
||||||
pub peer_socket_addr: SocketAddr,
|
|
||||||
/// Slab index of PeerConnection
|
|
||||||
pub peer_connection_index: usize,
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
pub fn run_network_worker(){
|
|
||||||
let address: SocketAddr = "0.0.0.0:3000".parse().unwrap();
|
|
||||||
|
|
||||||
let mut listener = TcpListener::bind(address).unwrap();
|
|
||||||
let mut poll = Poll::new().expect("create poll");
|
|
||||||
|
|
||||||
poll.registry()
|
|
||||||
.register(&mut listener, Token(0), Interest::READABLE)
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let mut events = Events::with_capacity(1024);
|
|
||||||
|
|
||||||
let timeout = Duration::from_millis(50);
|
|
||||||
|
|
||||||
let mut connections: Slab<Option<PeerConnection>> = Slab::new();
|
|
||||||
|
|
||||||
// Insert empty first entry to prevent assignment of index 0
|
|
||||||
assert_eq!(connections.insert(None), 0);
|
|
||||||
|
|
||||||
loop {
|
|
||||||
poll.poll(&mut events, Some(timeout))
|
|
||||||
.expect("failed polling");
|
|
||||||
|
|
||||||
let valid_until = Instant::now() + Duration::from_secs(600);
|
|
||||||
|
|
||||||
for event in events.iter(){
|
|
||||||
let token = event.token();
|
|
||||||
|
|
||||||
if token.0 == 0 {
|
|
||||||
loop {
|
|
||||||
match listener.accept(){
|
|
||||||
Ok((mut stream, src)) => {
|
|
||||||
let entry = connections.vacant_entry();
|
|
||||||
let token = Token(entry.key());
|
|
||||||
|
|
||||||
poll.registry()
|
|
||||||
.register(&mut stream, token, Interest::READABLE)
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
// FIXME: will this cause issues due to blocking?
|
|
||||||
// Should handshake be started manually below
|
|
||||||
// instead?
|
|
||||||
let ws = tungstenite::server::accept(stream).unwrap();
|
|
||||||
|
|
||||||
let peer_connection = PeerConnection {
|
|
||||||
ws,
|
|
||||||
peer_socket_addr: src,
|
|
||||||
valid_until,
|
|
||||||
};
|
|
||||||
|
|
||||||
entry.insert(Some(peer_connection));
|
|
||||||
},
|
|
||||||
Err(err) => {
|
|
||||||
if err.kind() == ErrorKind::WouldBlock {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
eprint!("{}", err);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else if event.is_readable(){
|
|
||||||
loop {
|
|
||||||
if let Some(Some(connection)) = connections.get_mut(token.0){
|
|
||||||
match connection.ws.read_message(){
|
|
||||||
Ok(message) => {
|
|
||||||
// FIXME: parse message, send to handler
|
|
||||||
// through channel (flume?)
|
|
||||||
|
|
||||||
connection.valid_until = valid_until;
|
|
||||||
},
|
|
||||||
Err(tungstenite::Error::Io(err)) => {
|
|
||||||
if err.kind() == ErrorKind::WouldBlock {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
eprint!("{}", err);
|
|
||||||
},
|
|
||||||
Err(tungstenite::Error::ConnectionClosed) => {
|
|
||||||
// FIXME: necessary?
|
|
||||||
poll.registry().deregister(connection.ws.get_mut()).unwrap();
|
|
||||||
|
|
||||||
connections.remove(token.0);
|
|
||||||
},
|
|
||||||
Err(err) => {
|
|
||||||
eprint!("{}", err);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let now = Instant::now();
|
|
||||||
|
|
||||||
// Close connections after some time of inactivity and write pending
|
|
||||||
// messages (which is required after closing anyway.)
|
|
||||||
//
|
|
||||||
// FIXME: peers need to be removed too, wherever they are stored
|
|
||||||
connections.retain(|_, opt_connection| {
|
|
||||||
if let Some(connection) = opt_connection {
|
|
||||||
if connection.valid_until < now {
|
|
||||||
connection.ws.close(None).unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
loop {
|
|
||||||
match connection.ws.write_pending(){
|
|
||||||
Err(tungstenite::Error::Io(err)) => {
|
|
||||||
if err.kind() == ErrorKind::WouldBlock {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Err(tungstenite::Error::ConnectionClosed) => {
|
|
||||||
// FIXME: necessary?
|
|
||||||
poll.registry()
|
|
||||||
.deregister(connection.ws.get_mut())
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
return false;
|
|
||||||
},
|
|
||||||
_ => {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
true
|
|
||||||
});
|
});
|
||||||
|
|
||||||
// TODO: loop through responses from channel, write them to wss or
|
loop {
|
||||||
// possibly register ws as writable (but this means event capacity
|
|
||||||
// must be adjusted accordingy and is limiting)
|
|
||||||
|
|
||||||
// How should IP's be handled? Send index and src to processing and
|
|
||||||
// lookup on return that entry is correct. Old ideas:
|
|
||||||
// Maybe use IndexMap<SocketAddr, WebSocket> and use numerical
|
|
||||||
// index for token? Removing element from IndexMap requires shifting
|
|
||||||
// or swapping indeces, so not very good.
|
|
||||||
for _ in 0..100 {
|
|
||||||
let connection_index = 1;
|
|
||||||
let peer_socket_addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
|
|
||||||
let message = Message::Text("test".to_string());
|
|
||||||
|
|
||||||
let opt_connection = connections
|
|
||||||
.get_mut(connection_index);
|
|
||||||
|
|
||||||
if let Some(Some(connection)) = opt_connection {
|
|
||||||
if connection.peer_socket_addr != peer_socket_addr {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
match connection.ws.write_message(message){
|
|
||||||
Ok(()) => {},
|
|
||||||
Err(tungstenite::Error::Io(err)) => {
|
|
||||||
if err.kind() == ErrorKind::WouldBlock {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
eprint!("{}", err);
|
|
||||||
},
|
|
||||||
Err(tungstenite::Error::ConnectionClosed) => {
|
|
||||||
// FIXME: necessary?
|
|
||||||
poll.registry()
|
|
||||||
.deregister(connection.ws.get_mut())
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
connections.remove(connection_index);
|
|
||||||
},
|
|
||||||
Err(err) => {
|
|
||||||
eprint!("{}", err);
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
214
aquatic_ws/src/lib/network.rs
Normal file
214
aquatic_ws/src/lib/network.rs
Normal file
|
|
@ -0,0 +1,214 @@
|
||||||
|
use std::net::{SocketAddr};
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
use std::io::ErrorKind;
|
||||||
|
use std::option::Option;
|
||||||
|
|
||||||
|
use slab::Slab;
|
||||||
|
use tungstenite::{Message, WebSocket};
|
||||||
|
|
||||||
|
use mio::{Events, Poll, Interest, Token};
|
||||||
|
use mio::net::{TcpListener, TcpStream};
|
||||||
|
|
||||||
|
use crate::common::*;
|
||||||
|
use crate::protocol::*;
|
||||||
|
|
||||||
|
|
||||||
|
pub struct PeerConnection {
|
||||||
|
pub ws: WebSocket<TcpStream>,
|
||||||
|
pub peer_socket_addr: SocketAddr,
|
||||||
|
pub valid_until: Instant,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pub fn run_socket_worker(
|
||||||
|
socket_worker_index: usize,
|
||||||
|
in_message_sender: InMessageSender,
|
||||||
|
out_message_receiver: OutMessageReceiver,
|
||||||
|
){
|
||||||
|
let address: SocketAddr = "0.0.0.0:3000".parse().unwrap();
|
||||||
|
|
||||||
|
let mut listener = TcpListener::bind(address).unwrap();
|
||||||
|
let mut poll = Poll::new().expect("create poll");
|
||||||
|
|
||||||
|
poll.registry()
|
||||||
|
.register(&mut listener, Token(0), Interest::READABLE)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let mut events = Events::with_capacity(1024);
|
||||||
|
|
||||||
|
let timeout = Duration::from_millis(50);
|
||||||
|
|
||||||
|
let mut connections: Slab<Option<PeerConnection>> = Slab::new();
|
||||||
|
|
||||||
|
// Insert empty first entry to prevent assignment of index 0
|
||||||
|
assert_eq!(connections.insert(None), 0);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
poll.poll(&mut events, Some(timeout))
|
||||||
|
.expect("failed polling");
|
||||||
|
|
||||||
|
let valid_until = Instant::now() + Duration::from_secs(600);
|
||||||
|
|
||||||
|
for event in events.iter(){
|
||||||
|
let token = event.token();
|
||||||
|
|
||||||
|
if token.0 == 0 {
|
||||||
|
loop {
|
||||||
|
match listener.accept(){
|
||||||
|
Ok((mut stream, src)) => {
|
||||||
|
let entry = connections.vacant_entry();
|
||||||
|
let token = Token(entry.key());
|
||||||
|
|
||||||
|
poll.registry()
|
||||||
|
.register(&mut stream, token, Interest::READABLE)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// FIXME: will this cause issues due to blocking?
|
||||||
|
// Should handshake be started manually below
|
||||||
|
// instead?
|
||||||
|
let ws = tungstenite::server::accept(stream).unwrap();
|
||||||
|
|
||||||
|
let peer_connection = PeerConnection {
|
||||||
|
ws,
|
||||||
|
peer_socket_addr: src,
|
||||||
|
valid_until,
|
||||||
|
};
|
||||||
|
|
||||||
|
entry.insert(Some(peer_connection));
|
||||||
|
},
|
||||||
|
Err(err) => {
|
||||||
|
if err.kind() == ErrorKind::WouldBlock {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
eprint!("{}", err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if event.is_readable(){
|
||||||
|
loop {
|
||||||
|
if let Some(Some(connection)) = connections.get_mut(token.0){
|
||||||
|
match connection.ws.read_message(){
|
||||||
|
Ok(message) => {
|
||||||
|
// FIXME: parse message, send to handler
|
||||||
|
// through channel (flume?)
|
||||||
|
|
||||||
|
let meta = MessageMeta {
|
||||||
|
socket_worker_index,
|
||||||
|
peer_connection_index: token.0,
|
||||||
|
peer_socket_addr: connection.peer_socket_addr
|
||||||
|
};
|
||||||
|
|
||||||
|
let in_message = InMessage::ScrapeRequest(ScrapeRequest {
|
||||||
|
info_hashes: None,
|
||||||
|
});
|
||||||
|
|
||||||
|
in_message_sender.send((meta, in_message));
|
||||||
|
|
||||||
|
connection.valid_until = valid_until;
|
||||||
|
},
|
||||||
|
Err(tungstenite::Error::Io(err)) => {
|
||||||
|
if err.kind() == ErrorKind::WouldBlock {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
eprint!("{}", err);
|
||||||
|
},
|
||||||
|
Err(tungstenite::Error::ConnectionClosed) => {
|
||||||
|
// FIXME: necessary?
|
||||||
|
poll.registry().deregister(connection.ws.get_mut()).unwrap();
|
||||||
|
|
||||||
|
connections.remove(token.0);
|
||||||
|
},
|
||||||
|
Err(err) => {
|
||||||
|
eprint!("{}", err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let now = Instant::now();
|
||||||
|
|
||||||
|
// Close connections after some time of inactivity and write pending
|
||||||
|
// messages (which is required after closing anyway.)
|
||||||
|
//
|
||||||
|
// FIXME: peers need to be removed too, wherever they are stored
|
||||||
|
connections.retain(|_, opt_connection| {
|
||||||
|
if let Some(connection) = opt_connection {
|
||||||
|
if connection.valid_until < now {
|
||||||
|
connection.ws.close(None).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match connection.ws.write_pending(){
|
||||||
|
Err(tungstenite::Error::Io(err)) => {
|
||||||
|
if err.kind() == ErrorKind::WouldBlock {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(tungstenite::Error::ConnectionClosed) => {
|
||||||
|
// FIXME: necessary?
|
||||||
|
poll.registry()
|
||||||
|
.deregister(connection.ws.get_mut())
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
return false;
|
||||||
|
},
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
true
|
||||||
|
});
|
||||||
|
|
||||||
|
// TODO: loop through responses from channel, write them to wss or
|
||||||
|
// possibly register ws as writable (but this means event capacity
|
||||||
|
// must be adjusted accordingy and is limiting)
|
||||||
|
|
||||||
|
// How should IP's be handled? Send index and src to processing and
|
||||||
|
// lookup on return that entry is correct. Old ideas:
|
||||||
|
// Maybe use IndexMap<SocketAddr, WebSocket> and use numerical
|
||||||
|
// index for token? Removing element from IndexMap requires shifting
|
||||||
|
// or swapping indeces, so not very good.
|
||||||
|
|
||||||
|
for (meta, out_message) in out_message_receiver.drain(){
|
||||||
|
let message = Message::Text("test".to_string());
|
||||||
|
|
||||||
|
let opt_connection = connections
|
||||||
|
.get_mut(meta.peer_connection_index);
|
||||||
|
|
||||||
|
if let Some(Some(connection)) = opt_connection {
|
||||||
|
if connection.peer_socket_addr != meta.peer_socket_addr {
|
||||||
|
eprintln!("socket worker: peer socket addrs didn't match");
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
match connection.ws.write_message(message){
|
||||||
|
Ok(()) => {},
|
||||||
|
Err(tungstenite::Error::Io(err)) => {
|
||||||
|
if err.kind() == ErrorKind::WouldBlock {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
eprint!("{}", err);
|
||||||
|
},
|
||||||
|
Err(tungstenite::Error::ConnectionClosed) => {
|
||||||
|
// FIXME: necessary?
|
||||||
|
poll.registry()
|
||||||
|
.deregister(connection.ws.get_mut())
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
connections.remove(meta.peer_connection_index);
|
||||||
|
},
|
||||||
|
Err(err) => {
|
||||||
|
eprint!("{}", err);
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -1,15 +1,19 @@
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::net::IpAddr;
|
use std::net::IpAddr;
|
||||||
|
|
||||||
|
use hashbrown::HashMap;
|
||||||
|
|
||||||
/// TODO: will need to store socket worker index and connection index
|
|
||||||
/// for middleman activities, also save SocketAddr instead of IP and port,
|
pub struct PeerId(pub [u8; 20]);
|
||||||
/// maybe, for comparison in socket worker
|
|
||||||
pub struct Peer {
|
|
||||||
pub complete: bool, // bytes_left == 0
|
pub struct InfoHash(pub [u8; 20]);
|
||||||
pub peer_id: [u8; 20],
|
|
||||||
|
|
||||||
|
pub struct ResponsePeer {
|
||||||
|
pub peer_id: PeerId,
|
||||||
pub ip: IpAddr, // From src socket addr
|
pub ip: IpAddr, // From src socket addr
|
||||||
pub port: u16, // From src port
|
pub port: u16, // From src port
|
||||||
|
pub complete: bool, // bytes_left == 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -21,14 +25,21 @@ pub enum AnnounceEvent {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
impl Default for AnnounceEvent {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::Update
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/// Apparently, these are sent to a number of peers when they are set
|
/// Apparently, these are sent to a number of peers when they are set
|
||||||
/// in an AnnounceRequest
|
/// in an AnnounceRequest
|
||||||
/// action = "announce"
|
/// action = "announce"
|
||||||
pub struct MiddlemanOfferToPeer {
|
pub struct MiddlemanOfferToPeer {
|
||||||
|
pub peer_id: PeerId, // Peer id of peer sending offer
|
||||||
|
pub info_hash: InfoHash,
|
||||||
pub offer: (), // Gets copied from AnnounceRequestOffer
|
pub offer: (), // Gets copied from AnnounceRequestOffer
|
||||||
pub offer_id: (), // Gets copied from AnnounceRequestOffer
|
pub offer_id: (), // Gets copied from AnnounceRequestOffer
|
||||||
pub peer_id: [u8; 20], // Peer id of peer sending offer
|
|
||||||
pub info_hash: [u8; 20],
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -36,10 +47,10 @@ pub struct MiddlemanOfferToPeer {
|
||||||
/// peer id == "to_peer_id" field
|
/// peer id == "to_peer_id" field
|
||||||
/// Action field should be 'announce'
|
/// Action field should be 'announce'
|
||||||
pub struct MiddlemanAnswerToPeer {
|
pub struct MiddlemanAnswerToPeer {
|
||||||
|
pub peer_id: PeerId,
|
||||||
|
pub info_hash: InfoHash,
|
||||||
pub answer: bool,
|
pub answer: bool,
|
||||||
pub offer_id: (),
|
pub offer_id: (),
|
||||||
pub peer_id: [u8; 20],
|
|
||||||
pub info_hash: [u8; 20],
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -51,8 +62,8 @@ pub struct AnnounceRequestOffer {
|
||||||
|
|
||||||
|
|
||||||
pub struct AnnounceRequest {
|
pub struct AnnounceRequest {
|
||||||
pub info_hash: [u8; 20], // FIXME: I think these are actually really just strings with 20 len, same with peer id
|
pub info_hash: InfoHash, // FIXME: I think these are actually really just strings with 20 len, same with peer id
|
||||||
pub peer_id: [u8; 20],
|
pub peer_id: PeerId,
|
||||||
pub bytes_left: bool, // Just called "left" in protocol
|
pub bytes_left: bool, // Just called "left" in protocol
|
||||||
pub event: AnnounceEvent, // Can be empty? Then, default is "update"
|
pub event: AnnounceEvent, // Can be empty? Then, default is "update"
|
||||||
|
|
||||||
|
|
@ -63,31 +74,30 @@ pub struct AnnounceRequest {
|
||||||
/// If false, send response before sending offers (or possibly "skip sending update back"?)
|
/// If false, send response before sending offers (or possibly "skip sending update back"?)
|
||||||
/// If true, send MiddlemanAnswerToPeer to peer with "to_peer_id" as peer_id.
|
/// If true, send MiddlemanAnswerToPeer to peer with "to_peer_id" as peer_id.
|
||||||
pub answer: bool,
|
pub answer: bool,
|
||||||
pub to_peer_id: Option<[u8; 20]>, // Only parsed to hex if answer == true, probably undefined otherwise
|
pub to_peer_id: Option<PeerId>, // Only parsed to hex if answer == true, probably undefined otherwise
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
pub struct AnnounceResponse {
|
pub struct AnnounceResponse {
|
||||||
pub info_hash: [u8; 20],
|
pub info_hash: InfoHash,
|
||||||
pub complete: usize,
|
pub complete: usize,
|
||||||
pub incomplete: usize,
|
pub incomplete: usize,
|
||||||
// I suspect receivers don't care about this and rely on offers instead??
|
// I suspect receivers don't care about this and rely on offers instead??
|
||||||
// Also, what does it contain, exacly?
|
// Also, what does it contain, exacly (not certain that it is ResponsePeer?)
|
||||||
pub peers: Vec<()>,
|
pub peers: Vec<ResponsePeer>,
|
||||||
pub interval: usize, // Default 2 min probably
|
pub interval: usize, // Default 2 min probably
|
||||||
|
|
||||||
// Sent to "to_peer_id" peer (?? or did I put this into MiddlemanAnswerToPeer instead?)
|
// Sent to "to_peer_id" peer (?? or did I put this into MiddlemanAnswerToPeer instead?)
|
||||||
pub offer_id: (),
|
// pub offer_id: (),
|
||||||
pub answer: bool,
|
// pub answer: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
pub struct ScrapeRequest {
|
pub struct ScrapeRequest {
|
||||||
// If omitted, scrape for all torrents, apparently
|
// If omitted, scrape for all torrents, apparently
|
||||||
// There is some kind of parsing here too which accepts a single info hash
|
// There is some kind of parsing here too which accepts a single info hash
|
||||||
// and puts it into a vector
|
// and puts it into a vector
|
||||||
pub info_hashes: Option<Vec<[u8; 20]>>,
|
pub info_hashes: Option<Vec<InfoHash>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -99,10 +109,20 @@ pub struct ScrapeStatistics {
|
||||||
|
|
||||||
|
|
||||||
pub struct ScrapeResponse {
|
pub struct ScrapeResponse {
|
||||||
pub files: HashMap<[u8; 20], ScrapeStatistics>, // InfoHash to Scrape stats
|
pub files: HashMap<InfoHash, ScrapeStatistics>, // InfoHash to Scrape stats
|
||||||
pub flags: HashMap<String, usize>,
|
pub flags: HashMap<String, usize>,
|
||||||
}
|
}
|
||||||
//pub struct ScrapeResponse {
|
|
||||||
// pub complete: usize,
|
|
||||||
// pub incomplete: usize,
|
pub enum InMessage {
|
||||||
//}
|
AnnounceRequest(AnnounceRequest),
|
||||||
|
ScrapeRequest(ScrapeRequest),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pub enum OutMessage {
|
||||||
|
AnnounceResponse(AnnounceResponse),
|
||||||
|
ScrapeResponse(ScrapeResponse),
|
||||||
|
Offer(MiddlemanOfferToPeer),
|
||||||
|
Answer(MiddlemanAnswerToPeer),
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue