diff --git a/TODO.md b/TODO.md index 2c98093..123e87e 100644 --- a/TODO.md +++ b/TODO.md @@ -1,5 +1,9 @@ # TODO +## aquatic_ws +* serde +* handler + ## aquatic * mio: set oneshot for epoll and kqueue? otherwise, stop reregistering? * Handle Ipv4 and Ipv6 peers. Probably split torrent state. Ipv4 peers diff --git a/aquatic_ws/src/lib/common.rs b/aquatic_ws/src/lib/common.rs index c45ef8c..c347425 100644 --- a/aquatic_ws/src/lib/common.rs +++ b/aquatic_ws/src/lib/common.rs @@ -15,11 +15,7 @@ pub struct Peer { pub peer_id: PeerId, pub complete: bool, pub valid_until: ValidUntil, - - // FIXME: these three could probably be replaced with MessageMeta - pub socket_worker_index: usize, - pub socket_addr: SocketAddr, - pub connection_index: usize, + pub connection_meta: ConnectionMeta, } @@ -50,32 +46,32 @@ impl Default for State { } -pub struct MessageMeta { - /// Index of socket worker that read this request. Required for sending - /// back response through correct channel to correct worker. +pub struct ConnectionMeta { + /// Index of socket worker responsible for this connection. Required for + /// sending back response through correct channel to correct worker. pub socket_worker_index: usize, /// SocketAddr of peer pub peer_socket_addr: SocketAddr, /// Slab index of PeerConnection - pub peer_connection_index: usize, + pub socket_worker_slab_index: usize, } -pub type InMessageSender = Sender<(MessageMeta, InMessage)>; -pub type InMessageReceiver = Receiver<(MessageMeta, InMessage)>; -pub type OutMessageReceiver = Receiver<(MessageMeta, OutMessage)>; +pub type InMessageSender = Sender<(ConnectionMeta, InMessage)>; +pub type InMessageReceiver = Receiver<(ConnectionMeta, InMessage)>; +pub type OutMessageReceiver = Receiver<(ConnectionMeta, OutMessage)>; -pub struct OutMessageSender(Vec>); +pub struct OutMessageSender(Vec>); impl OutMessageSender { - pub fn new(senders: Vec>) -> Self { + pub fn new(senders: Vec>) -> Self { Self(senders) } pub fn send( &self, - meta: MessageMeta, + meta: ConnectionMeta, message: OutMessage ){ self.0[meta.socket_worker_index].send((meta, message)); diff --git a/aquatic_ws/src/lib/lib.rs b/aquatic_ws/src/lib/lib.rs index fce84f0..713ae9b 100644 --- a/aquatic_ws/src/lib/lib.rs +++ b/aquatic_ws/src/lib/lib.rs @@ -10,9 +10,11 @@ use common::*; pub fn run(){ + let address: ::std::net::SocketAddr = "0.0.0.0:3000".parse().unwrap(); + let state = State::default(); - let (in_message_sender, in_message_receiver): (InMessageSender, InMessageReceiver) = ::flume::unbounded(); + let (in_message_sender, in_message_receiver) = ::flume::unbounded(); let mut out_message_senders = Vec::new(); @@ -25,6 +27,7 @@ pub fn run(){ ::std::thread::spawn(move || { network::run_socket_worker( + address, i, in_message_sender, out_message_receiver, @@ -43,6 +46,6 @@ pub fn run(){ }); loop { - + ::std::thread::sleep(::std::time::Duration::from_secs(60)); } } \ No newline at end of file diff --git a/aquatic_ws/src/lib/network.rs b/aquatic_ws/src/lib/network.rs index 8021446..cf39ca2 100644 --- a/aquatic_ws/src/lib/network.rs +++ b/aquatic_ws/src/lib/network.rs @@ -21,12 +21,11 @@ pub struct PeerConnection { pub fn run_socket_worker( + address: SocketAddr, socket_worker_index: usize, in_message_sender: InMessageSender, out_message_receiver: OutMessageReceiver, ){ - let address: SocketAddr = "0.0.0.0:3000".parse().unwrap(); - let mut listener = TcpListener::bind(address).unwrap(); let mut poll = Poll::new().expect("create poll"); @@ -90,15 +89,15 @@ pub fn run_socket_worker( if let Some(Some(connection)) = connections.get_mut(token.0){ match connection.ws.read_message(){ Ok(message) => { - // FIXME: parse message, send to handler - // through channel (flume?) + // TODO: convert tungstenite::Message to in_message - let meta = MessageMeta { + let meta = ConnectionMeta { socket_worker_index, - peer_connection_index: token.0, + socket_worker_slab_index: token.0, peer_socket_addr: connection.peer_socket_addr }; + // Dummy in_message let in_message = InMessage::ScrapeRequest(ScrapeRequest { info_hashes: None, }); @@ -116,7 +115,9 @@ pub fn run_socket_worker( }, Err(tungstenite::Error::ConnectionClosed) => { // FIXME: necessary? - poll.registry().deregister(connection.ws.get_mut()).unwrap(); + poll.registry() + .deregister(connection.ws.get_mut()) + .unwrap(); connections.remove(token.0); }, @@ -164,21 +165,14 @@ pub fn run_socket_worker( true }); - // TODO: loop through responses from channel, write them to wss or - // possibly register ws as writable (but this means event capacity - // must be adjusted accordingy and is limiting) - - // How should IP's be handled? Send index and src to processing and - // lookup on return that entry is correct. Old ideas: - // Maybe use IndexMap and use numerical - // index for token? Removing element from IndexMap requires shifting - // or swapping indeces, so not very good. - + // Read messages from channel, send to peers + // TODO: convert out_message to tungstenite::Message for (meta, out_message) in out_message_receiver.drain(){ + // dummy message let message = Message::Text("test".to_string()); let opt_connection = connections - .get_mut(meta.peer_connection_index); + .get_mut(meta.socket_worker_slab_index); if let Some(Some(connection)) = opt_connection { if connection.peer_socket_addr != meta.peer_socket_addr { @@ -202,7 +196,7 @@ pub fn run_socket_worker( .deregister(connection.ws.get_mut()) .unwrap(); - connections.remove(meta.peer_connection_index); + connections.remove(meta.socket_worker_slab_index); }, Err(err) => { eprint!("{}", err); diff --git a/aquatic_ws/src/lib/protocol.rs b/aquatic_ws/src/lib/protocol.rs index 01b210d..0248198 100644 --- a/aquatic_ws/src/lib/protocol.rs +++ b/aquatic_ws/src/lib/protocol.rs @@ -120,9 +120,23 @@ pub enum InMessage { } +impl InMessage { + fn from_ws_message(ws_messge: tungstenite::Message) -> Result { + unimplemented!() + } +} + + pub enum OutMessage { AnnounceResponse(AnnounceResponse), ScrapeResponse(ScrapeResponse), Offer(MiddlemanOfferToPeer), Answer(MiddlemanAnswerToPeer), +} + + +impl OutMessage { + fn to_ws_message(self) -> tungstenite::Message { + unimplemented!() + } } \ No newline at end of file