From cf75a07a7e8e193b80361f3d7236718fa1a74160 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 7 May 2020 16:49:28 +0200 Subject: [PATCH] WIP: aquatic_ws: start work on data structures, data flow --- Cargo.lock | 84 ++++++++++++- aquatic_ws/Cargo.toml | 2 +- aquatic_ws/src/bin/main.rs | 2 +- aquatic_ws/src/lib/common.rs | 83 +++++++++++++ aquatic_ws/src/lib/handler.rs | 47 +++++++ aquatic_ws/src/lib/lib.rs | 221 +++++---------------------------- aquatic_ws/src/lib/network.rs | 214 +++++++++++++++++++++++++++++++ aquatic_ws/src/lib/protocol.rs | 72 +++++++---- 8 files changed, 503 insertions(+), 222 deletions(-) create mode 100644 aquatic_ws/src/lib/common.rs create mode 100644 aquatic_ws/src/lib/handler.rs create mode 100644 aquatic_ws/src/lib/network.rs diff --git a/Cargo.lock b/Cargo.lock index bced461..2db7cd9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -84,7 +84,7 @@ version = "0.1.0" dependencies = [ "bittorrent_udp", "cli_helpers", - "crossbeam-channel", + "flume", "hashbrown", "histogram", "indexmap", @@ -344,6 +344,16 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" +[[package]] +name = "flume" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "855e285c3835897065a6ba6f9463b44553eb9f29c7988d692f3d41283b47388b" +dependencies = [ + "futures", + "spin", +] + [[package]] name = "fnv" version = "1.0.6" @@ -365,6 +375,66 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" +[[package]] +name = "futures" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c329ae8753502fb44ae4fc2b622fa2a94652c41e795143765ba0927f92ab780" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0c77d04ce8edd9cb903932b608268b3fffec4163dc053b3b402bf47eac1f1a8" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f25592f769825e89b92358db00d26f965761e094951ac44d3663ef25b7ac464a" + +[[package]] +name = "futures-io" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a638959aa96152c7a4cddf50fcb1e3fede0583b27157c26e67d6f99904090dc6" + +[[package]] +name = "futures-sink" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3466821b4bc114d95b087b850a724c6f83115e929bc88f1fa98a3304a944c8a6" + +[[package]] +name = "futures-task" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b0a34e53cf6cdcd0178aa573aed466b646eb3db769570841fda0c7ede375a27" + +[[package]] +name = "futures-util" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22766cf25d64306bedf0384da004d05c9974ab104fcc4528f1236181c18004c5" +dependencies = [ + "futures-core", + "futures-sink", + "futures-task", + "pin-utils", +] + [[package]] name = "generic-array" version = "0.12.3" @@ -810,6 +880,12 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "pkg-config" version = "0.3.17" @@ -1092,6 +1168,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "spin" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" + [[package]] name = "syn" version = "1.0.18" diff --git a/aquatic_ws/Cargo.toml b/aquatic_ws/Cargo.toml index 3497d23..c71b4ae 100644 --- a/aquatic_ws/Cargo.toml +++ b/aquatic_ws/Cargo.toml @@ -16,7 +16,7 @@ path = "src/bin/main.rs" [dependencies] bittorrent_udp = { path = "../bittorrent_udp" } cli_helpers = { path = "../cli_helpers" } -crossbeam-channel = "0.4" +flume = "0.7" hashbrown = "0.7" histogram = "0.6" indexmap = "1" diff --git a/aquatic_ws/src/bin/main.rs b/aquatic_ws/src/bin/main.rs index 1c46e7f..fc15728 100644 --- a/aquatic_ws/src/bin/main.rs +++ b/aquatic_ws/src/bin/main.rs @@ -2,5 +2,5 @@ use aquatic_ws; fn main(){ - aquatic_ws::run_network_worker(); + aquatic_ws::run(); } \ No newline at end of file diff --git a/aquatic_ws/src/lib/common.rs b/aquatic_ws/src/lib/common.rs new file mode 100644 index 0000000..c45ef8c --- /dev/null +++ b/aquatic_ws/src/lib/common.rs @@ -0,0 +1,83 @@ +use std::net::SocketAddr; +use std::time::Instant; + +use flume::{Sender, Receiver}; +use hashbrown::HashMap; +use indexmap::IndexMap; + +use crate::protocol::*; + + +pub struct ValidUntil(pub Instant); + + +pub struct Peer { + pub peer_id: PeerId, + pub complete: bool, + pub valid_until: ValidUntil, + + // FIXME: these three could probably be replaced with MessageMeta + pub socket_worker_index: usize, + pub socket_addr: SocketAddr, + pub connection_index: usize, +} + + +pub type PeerMap = IndexMap; + + +pub struct TorrentData { + pub peers: PeerMap, + pub seeders: usize, + pub leechers: usize, +} + + +pub type TorrentMap = HashMap; + + +pub struct State { + pub torrents: TorrentMap, +} + + +impl Default for State { + fn default() -> Self { + Self { + torrents: HashMap::new(), + } + } +} + + +pub struct MessageMeta { + /// Index of socket worker that read this request. Required for sending + /// back response through correct channel to correct worker. + pub socket_worker_index: usize, + /// SocketAddr of peer + pub peer_socket_addr: SocketAddr, + /// Slab index of PeerConnection + pub peer_connection_index: usize, +} + + +pub type InMessageSender = Sender<(MessageMeta, InMessage)>; +pub type InMessageReceiver = Receiver<(MessageMeta, InMessage)>; +pub type OutMessageReceiver = Receiver<(MessageMeta, OutMessage)>; + + +pub struct OutMessageSender(Vec>); + + +impl OutMessageSender { + pub fn new(senders: Vec>) -> Self { + Self(senders) + } + pub fn send( + &self, + meta: MessageMeta, + message: OutMessage + ){ + self.0[meta.socket_worker_index].send((meta, message)); + } +} \ No newline at end of file diff --git a/aquatic_ws/src/lib/handler.rs b/aquatic_ws/src/lib/handler.rs new file mode 100644 index 0000000..dfb1970 --- /dev/null +++ b/aquatic_ws/src/lib/handler.rs @@ -0,0 +1,47 @@ +use std::time::Duration; + +use hashbrown::HashMap; + +use crate::common::*; +use crate::protocol::*; + + +pub fn run_request_worker( + state: State, + in_message_receiver: InMessageReceiver, + out_message_sender: OutMessageSender, +){ + let mut in_messages = Vec::new(); + let mut out_messages = Vec::new(); + + let timeout = Duration::from_micros(200); + + for i in 0..1000 { + if i == 0 { + if let Ok((meta, in_message)) = in_message_receiver.recv(){ + in_messages.push((meta, in_message)); + } + } else { + let res_in_message = in_message_receiver.recv_timeout(timeout); + + if let Ok((meta, in_message)) = res_in_message { + in_messages.push((meta, in_message)); + } else { + break + } + }; + } + + for (meta, in_message) in in_messages.drain(..){ + let out_message = OutMessage::ScrapeResponse(ScrapeResponse { + files: HashMap::new(), + flags: HashMap::new(), + }); + + out_messages.push((meta, out_message)); + } + + for (meta, out_message) in out_messages.drain(..){ + out_message_sender.send(meta, out_message); + } +} \ No newline at end of file diff --git a/aquatic_ws/src/lib/lib.rs b/aquatic_ws/src/lib/lib.rs index 424f625..fce84f0 100644 --- a/aquatic_ws/src/lib/lib.rs +++ b/aquatic_ws/src/lib/lib.rs @@ -1,213 +1,48 @@ //! There is not much point in doing more work until more clarity on //! exact protocol is achieved -use std::net::{SocketAddr}; -use std::time::{Duration, Instant}; -use std::io::ErrorKind; -use std::option::Option; - -use slab::Slab; -use tungstenite::{Message, WebSocket}; - -use mio::{Events, Poll, Interest, Token}; -use mio::net::{TcpListener, TcpStream}; - +pub mod common; +pub mod handler; +pub mod network; pub mod protocol; - -pub struct PeerConnection { - pub ws: WebSocket, - pub peer_socket_addr: SocketAddr, - pub valid_until: Instant, -} +use common::*; -/// First thoughts on what to send to handler -pub struct HandlerMessage { - /// Index of socket worker that read this request. Required for sending - /// back response through correct channel to correct worker. - pub socket_worker_index: usize, - /// FIXME: Should this be parsed request? - pub message: T, - /// SocketAddr of peer - pub peer_socket_addr: SocketAddr, - /// Slab index of PeerConnection - pub peer_connection_index: usize, -} +pub fn run(){ + let state = State::default(); + let (in_message_sender, in_message_receiver): (InMessageSender, InMessageReceiver) = ::flume::unbounded(); -pub fn run_network_worker(){ - let address: SocketAddr = "0.0.0.0:3000".parse().unwrap(); + let mut out_message_senders = Vec::new(); - let mut listener = TcpListener::bind(address).unwrap(); - let mut poll = Poll::new().expect("create poll"); + for i in 0..2 { + let in_message_sender = in_message_sender.clone(); - poll.registry() - .register(&mut listener, Token(0), Interest::READABLE) - .unwrap(); + let (out_message_sender, out_message_receiver) = ::flume::unbounded(); - let mut events = Events::with_capacity(1024); + out_message_senders.push(out_message_sender); - let timeout = Duration::from_millis(50); + ::std::thread::spawn(move || { + network::run_socket_worker( + i, + in_message_sender, + out_message_receiver, + ); + }); + } - let mut connections: Slab> = Slab::new(); + let out_message_sender = OutMessageSender::new(out_message_senders); - // Insert empty first entry to prevent assignment of index 0 - assert_eq!(connections.insert(None), 0); + ::std::thread::spawn(move || { + handler::run_request_worker( + state, + in_message_receiver, + out_message_sender, + ); + }); loop { - poll.poll(&mut events, Some(timeout)) - .expect("failed polling"); - - let valid_until = Instant::now() + Duration::from_secs(600); - for event in events.iter(){ - let token = event.token(); - - if token.0 == 0 { - loop { - match listener.accept(){ - Ok((mut stream, src)) => { - let entry = connections.vacant_entry(); - let token = Token(entry.key()); - - poll.registry() - .register(&mut stream, token, Interest::READABLE) - .unwrap(); - - // FIXME: will this cause issues due to blocking? - // Should handshake be started manually below - // instead? - let ws = tungstenite::server::accept(stream).unwrap(); - - let peer_connection = PeerConnection { - ws, - peer_socket_addr: src, - valid_until, - }; - - entry.insert(Some(peer_connection)); - }, - Err(err) => { - if err.kind() == ErrorKind::WouldBlock { - break - } - - eprint!("{}", err); - } - } - } - } else if event.is_readable(){ - loop { - if let Some(Some(connection)) = connections.get_mut(token.0){ - match connection.ws.read_message(){ - Ok(message) => { - // FIXME: parse message, send to handler - // through channel (flume?) - - connection.valid_until = valid_until; - }, - Err(tungstenite::Error::Io(err)) => { - if err.kind() == ErrorKind::WouldBlock { - break - } - - eprint!("{}", err); - }, - Err(tungstenite::Error::ConnectionClosed) => { - // FIXME: necessary? - poll.registry().deregister(connection.ws.get_mut()).unwrap(); - - connections.remove(token.0); - }, - Err(err) => { - eprint!("{}", err); - } - } - } - } - } - } - - let now = Instant::now(); - - // Close connections after some time of inactivity and write pending - // messages (which is required after closing anyway.) - // - // FIXME: peers need to be removed too, wherever they are stored - connections.retain(|_, opt_connection| { - if let Some(connection) = opt_connection { - if connection.valid_until < now { - connection.ws.close(None).unwrap(); - } - - loop { - match connection.ws.write_pending(){ - Err(tungstenite::Error::Io(err)) => { - if err.kind() == ErrorKind::WouldBlock { - break - } - }, - Err(tungstenite::Error::ConnectionClosed) => { - // FIXME: necessary? - poll.registry() - .deregister(connection.ws.get_mut()) - .unwrap(); - - return false; - }, - _ => {} - } - } - } - - true - }); - - // TODO: loop through responses from channel, write them to wss or - // possibly register ws as writable (but this means event capacity - // must be adjusted accordingy and is limiting) - - // How should IP's be handled? Send index and src to processing and - // lookup on return that entry is correct. Old ideas: - // Maybe use IndexMap and use numerical - // index for token? Removing element from IndexMap requires shifting - // or swapping indeces, so not very good. - for _ in 0..100 { - let connection_index = 1; - let peer_socket_addr: SocketAddr = "127.0.0.1:8080".parse().unwrap(); - let message = Message::Text("test".to_string()); - - let opt_connection = connections - .get_mut(connection_index); - - if let Some(Some(connection)) = opt_connection { - if connection.peer_socket_addr != peer_socket_addr { - continue; - } - - match connection.ws.write_message(message){ - Ok(()) => {}, - Err(tungstenite::Error::Io(err)) => { - if err.kind() == ErrorKind::WouldBlock { - continue; - } - - eprint!("{}", err); - }, - Err(tungstenite::Error::ConnectionClosed) => { - // FIXME: necessary? - poll.registry() - .deregister(connection.ws.get_mut()) - .unwrap(); - - connections.remove(connection_index); - }, - Err(err) => { - eprint!("{}", err); - }, - } - } - } } } \ No newline at end of file diff --git a/aquatic_ws/src/lib/network.rs b/aquatic_ws/src/lib/network.rs new file mode 100644 index 0000000..8021446 --- /dev/null +++ b/aquatic_ws/src/lib/network.rs @@ -0,0 +1,214 @@ +use std::net::{SocketAddr}; +use std::time::{Duration, Instant}; +use std::io::ErrorKind; +use std::option::Option; + +use slab::Slab; +use tungstenite::{Message, WebSocket}; + +use mio::{Events, Poll, Interest, Token}; +use mio::net::{TcpListener, TcpStream}; + +use crate::common::*; +use crate::protocol::*; + + +pub struct PeerConnection { + pub ws: WebSocket, + pub peer_socket_addr: SocketAddr, + pub valid_until: Instant, +} + + +pub fn run_socket_worker( + socket_worker_index: usize, + in_message_sender: InMessageSender, + out_message_receiver: OutMessageReceiver, +){ + let address: SocketAddr = "0.0.0.0:3000".parse().unwrap(); + + let mut listener = TcpListener::bind(address).unwrap(); + let mut poll = Poll::new().expect("create poll"); + + poll.registry() + .register(&mut listener, Token(0), Interest::READABLE) + .unwrap(); + + let mut events = Events::with_capacity(1024); + + let timeout = Duration::from_millis(50); + + let mut connections: Slab> = Slab::new(); + + // Insert empty first entry to prevent assignment of index 0 + assert_eq!(connections.insert(None), 0); + + loop { + poll.poll(&mut events, Some(timeout)) + .expect("failed polling"); + + let valid_until = Instant::now() + Duration::from_secs(600); + + for event in events.iter(){ + let token = event.token(); + + if token.0 == 0 { + loop { + match listener.accept(){ + Ok((mut stream, src)) => { + let entry = connections.vacant_entry(); + let token = Token(entry.key()); + + poll.registry() + .register(&mut stream, token, Interest::READABLE) + .unwrap(); + + // FIXME: will this cause issues due to blocking? + // Should handshake be started manually below + // instead? + let ws = tungstenite::server::accept(stream).unwrap(); + + let peer_connection = PeerConnection { + ws, + peer_socket_addr: src, + valid_until, + }; + + entry.insert(Some(peer_connection)); + }, + Err(err) => { + if err.kind() == ErrorKind::WouldBlock { + break + } + + eprint!("{}", err); + } + } + } + } else if event.is_readable(){ + loop { + if let Some(Some(connection)) = connections.get_mut(token.0){ + match connection.ws.read_message(){ + Ok(message) => { + // FIXME: parse message, send to handler + // through channel (flume?) + + let meta = MessageMeta { + socket_worker_index, + peer_connection_index: token.0, + peer_socket_addr: connection.peer_socket_addr + }; + + let in_message = InMessage::ScrapeRequest(ScrapeRequest { + info_hashes: None, + }); + + in_message_sender.send((meta, in_message)); + + connection.valid_until = valid_until; + }, + Err(tungstenite::Error::Io(err)) => { + if err.kind() == ErrorKind::WouldBlock { + break + } + + eprint!("{}", err); + }, + Err(tungstenite::Error::ConnectionClosed) => { + // FIXME: necessary? + poll.registry().deregister(connection.ws.get_mut()).unwrap(); + + connections.remove(token.0); + }, + Err(err) => { + eprint!("{}", err); + } + } + } + } + } + } + + let now = Instant::now(); + + // Close connections after some time of inactivity and write pending + // messages (which is required after closing anyway.) + // + // FIXME: peers need to be removed too, wherever they are stored + connections.retain(|_, opt_connection| { + if let Some(connection) = opt_connection { + if connection.valid_until < now { + connection.ws.close(None).unwrap(); + } + + loop { + match connection.ws.write_pending(){ + Err(tungstenite::Error::Io(err)) => { + if err.kind() == ErrorKind::WouldBlock { + break + } + }, + Err(tungstenite::Error::ConnectionClosed) => { + // FIXME: necessary? + poll.registry() + .deregister(connection.ws.get_mut()) + .unwrap(); + + return false; + }, + _ => {} + } + } + } + + true + }); + + // TODO: loop through responses from channel, write them to wss or + // possibly register ws as writable (but this means event capacity + // must be adjusted accordingy and is limiting) + + // How should IP's be handled? Send index and src to processing and + // lookup on return that entry is correct. Old ideas: + // Maybe use IndexMap and use numerical + // index for token? Removing element from IndexMap requires shifting + // or swapping indeces, so not very good. + + for (meta, out_message) in out_message_receiver.drain(){ + let message = Message::Text("test".to_string()); + + let opt_connection = connections + .get_mut(meta.peer_connection_index); + + if let Some(Some(connection)) = opt_connection { + if connection.peer_socket_addr != meta.peer_socket_addr { + eprintln!("socket worker: peer socket addrs didn't match"); + + continue; + } + + match connection.ws.write_message(message){ + Ok(()) => {}, + Err(tungstenite::Error::Io(err)) => { + if err.kind() == ErrorKind::WouldBlock { + continue; + } + + eprint!("{}", err); + }, + Err(tungstenite::Error::ConnectionClosed) => { + // FIXME: necessary? + poll.registry() + .deregister(connection.ws.get_mut()) + .unwrap(); + + connections.remove(meta.peer_connection_index); + }, + Err(err) => { + eprint!("{}", err); + }, + } + } + } + } +} \ No newline at end of file diff --git a/aquatic_ws/src/lib/protocol.rs b/aquatic_ws/src/lib/protocol.rs index ce7a3ae..01b210d 100644 --- a/aquatic_ws/src/lib/protocol.rs +++ b/aquatic_ws/src/lib/protocol.rs @@ -1,15 +1,19 @@ -use std::collections::HashMap; use std::net::IpAddr; +use hashbrown::HashMap; -/// TODO: will need to store socket worker index and connection index -/// for middleman activities, also save SocketAddr instead of IP and port, -/// maybe, for comparison in socket worker -pub struct Peer { - pub complete: bool, // bytes_left == 0 - pub peer_id: [u8; 20], + +pub struct PeerId(pub [u8; 20]); + + +pub struct InfoHash(pub [u8; 20]); + + +pub struct ResponsePeer { + pub peer_id: PeerId, pub ip: IpAddr, // From src socket addr pub port: u16, // From src port + pub complete: bool, // bytes_left == 0 } @@ -21,14 +25,21 @@ pub enum AnnounceEvent { } +impl Default for AnnounceEvent { + fn default() -> Self { + Self::Update + } +} + + /// Apparently, these are sent to a number of peers when they are set /// in an AnnounceRequest /// action = "announce" pub struct MiddlemanOfferToPeer { + pub peer_id: PeerId, // Peer id of peer sending offer + pub info_hash: InfoHash, pub offer: (), // Gets copied from AnnounceRequestOffer pub offer_id: (), // Gets copied from AnnounceRequestOffer - pub peer_id: [u8; 20], // Peer id of peer sending offer - pub info_hash: [u8; 20], } @@ -36,10 +47,10 @@ pub struct MiddlemanOfferToPeer { /// peer id == "to_peer_id" field /// Action field should be 'announce' pub struct MiddlemanAnswerToPeer { + pub peer_id: PeerId, + pub info_hash: InfoHash, pub answer: bool, pub offer_id: (), - pub peer_id: [u8; 20], - pub info_hash: [u8; 20], } @@ -51,8 +62,8 @@ pub struct AnnounceRequestOffer { pub struct AnnounceRequest { - pub info_hash: [u8; 20], // FIXME: I think these are actually really just strings with 20 len, same with peer id - pub peer_id: [u8; 20], + pub info_hash: InfoHash, // FIXME: I think these are actually really just strings with 20 len, same with peer id + pub peer_id: PeerId, pub bytes_left: bool, // Just called "left" in protocol pub event: AnnounceEvent, // Can be empty? Then, default is "update" @@ -63,31 +74,30 @@ pub struct AnnounceRequest { /// If false, send response before sending offers (or possibly "skip sending update back"?) /// If true, send MiddlemanAnswerToPeer to peer with "to_peer_id" as peer_id. pub answer: bool, - pub to_peer_id: Option<[u8; 20]>, // Only parsed to hex if answer == true, probably undefined otherwise + pub to_peer_id: Option, // Only parsed to hex if answer == true, probably undefined otherwise } pub struct AnnounceResponse { - pub info_hash: [u8; 20], + pub info_hash: InfoHash, pub complete: usize, pub incomplete: usize, // I suspect receivers don't care about this and rely on offers instead?? - // Also, what does it contain, exacly? - pub peers: Vec<()>, + // Also, what does it contain, exacly (not certain that it is ResponsePeer?) + pub peers: Vec, pub interval: usize, // Default 2 min probably // Sent to "to_peer_id" peer (?? or did I put this into MiddlemanAnswerToPeer instead?) - pub offer_id: (), - pub answer: bool, + // pub offer_id: (), + // pub answer: bool, } - pub struct ScrapeRequest { // If omitted, scrape for all torrents, apparently // There is some kind of parsing here too which accepts a single info hash // and puts it into a vector - pub info_hashes: Option>, + pub info_hashes: Option>, } @@ -99,10 +109,20 @@ pub struct ScrapeStatistics { pub struct ScrapeResponse { - pub files: HashMap<[u8; 20], ScrapeStatistics>, // InfoHash to Scrape stats + pub files: HashMap, // InfoHash to Scrape stats pub flags: HashMap, } -//pub struct ScrapeResponse { -// pub complete: usize, -// pub incomplete: usize, -//} \ No newline at end of file + + +pub enum InMessage { + AnnounceRequest(AnnounceRequest), + ScrapeRequest(ScrapeRequest), +} + + +pub enum OutMessage { + AnnounceResponse(AnnounceResponse), + ScrapeResponse(ScrapeResponse), + Offer(MiddlemanOfferToPeer), + Answer(MiddlemanAnswerToPeer), +} \ No newline at end of file