mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-04-01 18:25:30 +00:00
WIP: aquatic_ws: minor work, renaming, mock impls
This commit is contained in:
parent
cf75a07a7e
commit
c94b3d7d3c
5 changed files with 47 additions and 36 deletions
4
TODO.md
4
TODO.md
|
|
@ -1,5 +1,9 @@
|
||||||
# TODO
|
# TODO
|
||||||
|
|
||||||
|
## aquatic_ws
|
||||||
|
* serde
|
||||||
|
* handler
|
||||||
|
|
||||||
## aquatic
|
## aquatic
|
||||||
* mio: set oneshot for epoll and kqueue? otherwise, stop reregistering?
|
* mio: set oneshot for epoll and kqueue? otherwise, stop reregistering?
|
||||||
* Handle Ipv4 and Ipv6 peers. Probably split torrent state. Ipv4 peers
|
* Handle Ipv4 and Ipv6 peers. Probably split torrent state. Ipv4 peers
|
||||||
|
|
|
||||||
|
|
@ -15,11 +15,7 @@ pub struct Peer {
|
||||||
pub peer_id: PeerId,
|
pub peer_id: PeerId,
|
||||||
pub complete: bool,
|
pub complete: bool,
|
||||||
pub valid_until: ValidUntil,
|
pub valid_until: ValidUntil,
|
||||||
|
pub connection_meta: ConnectionMeta,
|
||||||
// FIXME: these three could probably be replaced with MessageMeta
|
|
||||||
pub socket_worker_index: usize,
|
|
||||||
pub socket_addr: SocketAddr,
|
|
||||||
pub connection_index: usize,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -50,32 +46,32 @@ impl Default for State {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
pub struct MessageMeta {
|
pub struct ConnectionMeta {
|
||||||
/// Index of socket worker that read this request. Required for sending
|
/// Index of socket worker responsible for this connection. Required for
|
||||||
/// back response through correct channel to correct worker.
|
/// sending back response through correct channel to correct worker.
|
||||||
pub socket_worker_index: usize,
|
pub socket_worker_index: usize,
|
||||||
/// SocketAddr of peer
|
/// SocketAddr of peer
|
||||||
pub peer_socket_addr: SocketAddr,
|
pub peer_socket_addr: SocketAddr,
|
||||||
/// Slab index of PeerConnection
|
/// Slab index of PeerConnection
|
||||||
pub peer_connection_index: usize,
|
pub socket_worker_slab_index: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
pub type InMessageSender = Sender<(MessageMeta, InMessage)>;
|
pub type InMessageSender = Sender<(ConnectionMeta, InMessage)>;
|
||||||
pub type InMessageReceiver = Receiver<(MessageMeta, InMessage)>;
|
pub type InMessageReceiver = Receiver<(ConnectionMeta, InMessage)>;
|
||||||
pub type OutMessageReceiver = Receiver<(MessageMeta, OutMessage)>;
|
pub type OutMessageReceiver = Receiver<(ConnectionMeta, OutMessage)>;
|
||||||
|
|
||||||
|
|
||||||
pub struct OutMessageSender(Vec<Sender<(MessageMeta, OutMessage)>>);
|
pub struct OutMessageSender(Vec<Sender<(ConnectionMeta, OutMessage)>>);
|
||||||
|
|
||||||
|
|
||||||
impl OutMessageSender {
|
impl OutMessageSender {
|
||||||
pub fn new(senders: Vec<Sender<(MessageMeta, OutMessage)>>) -> Self {
|
pub fn new(senders: Vec<Sender<(ConnectionMeta, OutMessage)>>) -> Self {
|
||||||
Self(senders)
|
Self(senders)
|
||||||
}
|
}
|
||||||
pub fn send(
|
pub fn send(
|
||||||
&self,
|
&self,
|
||||||
meta: MessageMeta,
|
meta: ConnectionMeta,
|
||||||
message: OutMessage
|
message: OutMessage
|
||||||
){
|
){
|
||||||
self.0[meta.socket_worker_index].send((meta, message));
|
self.0[meta.socket_worker_index].send((meta, message));
|
||||||
|
|
|
||||||
|
|
@ -10,9 +10,11 @@ use common::*;
|
||||||
|
|
||||||
|
|
||||||
pub fn run(){
|
pub fn run(){
|
||||||
|
let address: ::std::net::SocketAddr = "0.0.0.0:3000".parse().unwrap();
|
||||||
|
|
||||||
let state = State::default();
|
let state = State::default();
|
||||||
|
|
||||||
let (in_message_sender, in_message_receiver): (InMessageSender, InMessageReceiver) = ::flume::unbounded();
|
let (in_message_sender, in_message_receiver) = ::flume::unbounded();
|
||||||
|
|
||||||
let mut out_message_senders = Vec::new();
|
let mut out_message_senders = Vec::new();
|
||||||
|
|
||||||
|
|
@ -25,6 +27,7 @@ pub fn run(){
|
||||||
|
|
||||||
::std::thread::spawn(move || {
|
::std::thread::spawn(move || {
|
||||||
network::run_socket_worker(
|
network::run_socket_worker(
|
||||||
|
address,
|
||||||
i,
|
i,
|
||||||
in_message_sender,
|
in_message_sender,
|
||||||
out_message_receiver,
|
out_message_receiver,
|
||||||
|
|
@ -43,6 +46,6 @@ pub fn run(){
|
||||||
});
|
});
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
::std::thread::sleep(::std::time::Duration::from_secs(60));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -21,12 +21,11 @@ pub struct PeerConnection {
|
||||||
|
|
||||||
|
|
||||||
pub fn run_socket_worker(
|
pub fn run_socket_worker(
|
||||||
|
address: SocketAddr,
|
||||||
socket_worker_index: usize,
|
socket_worker_index: usize,
|
||||||
in_message_sender: InMessageSender,
|
in_message_sender: InMessageSender,
|
||||||
out_message_receiver: OutMessageReceiver,
|
out_message_receiver: OutMessageReceiver,
|
||||||
){
|
){
|
||||||
let address: SocketAddr = "0.0.0.0:3000".parse().unwrap();
|
|
||||||
|
|
||||||
let mut listener = TcpListener::bind(address).unwrap();
|
let mut listener = TcpListener::bind(address).unwrap();
|
||||||
let mut poll = Poll::new().expect("create poll");
|
let mut poll = Poll::new().expect("create poll");
|
||||||
|
|
||||||
|
|
@ -90,15 +89,15 @@ pub fn run_socket_worker(
|
||||||
if let Some(Some(connection)) = connections.get_mut(token.0){
|
if let Some(Some(connection)) = connections.get_mut(token.0){
|
||||||
match connection.ws.read_message(){
|
match connection.ws.read_message(){
|
||||||
Ok(message) => {
|
Ok(message) => {
|
||||||
// FIXME: parse message, send to handler
|
// TODO: convert tungstenite::Message to in_message
|
||||||
// through channel (flume?)
|
|
||||||
|
|
||||||
let meta = MessageMeta {
|
let meta = ConnectionMeta {
|
||||||
socket_worker_index,
|
socket_worker_index,
|
||||||
peer_connection_index: token.0,
|
socket_worker_slab_index: token.0,
|
||||||
peer_socket_addr: connection.peer_socket_addr
|
peer_socket_addr: connection.peer_socket_addr
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Dummy in_message
|
||||||
let in_message = InMessage::ScrapeRequest(ScrapeRequest {
|
let in_message = InMessage::ScrapeRequest(ScrapeRequest {
|
||||||
info_hashes: None,
|
info_hashes: None,
|
||||||
});
|
});
|
||||||
|
|
@ -116,7 +115,9 @@ pub fn run_socket_worker(
|
||||||
},
|
},
|
||||||
Err(tungstenite::Error::ConnectionClosed) => {
|
Err(tungstenite::Error::ConnectionClosed) => {
|
||||||
// FIXME: necessary?
|
// FIXME: necessary?
|
||||||
poll.registry().deregister(connection.ws.get_mut()).unwrap();
|
poll.registry()
|
||||||
|
.deregister(connection.ws.get_mut())
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
connections.remove(token.0);
|
connections.remove(token.0);
|
||||||
},
|
},
|
||||||
|
|
@ -164,21 +165,14 @@ pub fn run_socket_worker(
|
||||||
true
|
true
|
||||||
});
|
});
|
||||||
|
|
||||||
// TODO: loop through responses from channel, write them to wss or
|
// Read messages from channel, send to peers
|
||||||
// possibly register ws as writable (but this means event capacity
|
// TODO: convert out_message to tungstenite::Message
|
||||||
// must be adjusted accordingy and is limiting)
|
|
||||||
|
|
||||||
// How should IP's be handled? Send index and src to processing and
|
|
||||||
// lookup on return that entry is correct. Old ideas:
|
|
||||||
// Maybe use IndexMap<SocketAddr, WebSocket> and use numerical
|
|
||||||
// index for token? Removing element from IndexMap requires shifting
|
|
||||||
// or swapping indeces, so not very good.
|
|
||||||
|
|
||||||
for (meta, out_message) in out_message_receiver.drain(){
|
for (meta, out_message) in out_message_receiver.drain(){
|
||||||
|
// dummy message
|
||||||
let message = Message::Text("test".to_string());
|
let message = Message::Text("test".to_string());
|
||||||
|
|
||||||
let opt_connection = connections
|
let opt_connection = connections
|
||||||
.get_mut(meta.peer_connection_index);
|
.get_mut(meta.socket_worker_slab_index);
|
||||||
|
|
||||||
if let Some(Some(connection)) = opt_connection {
|
if let Some(Some(connection)) = opt_connection {
|
||||||
if connection.peer_socket_addr != meta.peer_socket_addr {
|
if connection.peer_socket_addr != meta.peer_socket_addr {
|
||||||
|
|
@ -202,7 +196,7 @@ pub fn run_socket_worker(
|
||||||
.deregister(connection.ws.get_mut())
|
.deregister(connection.ws.get_mut())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
connections.remove(meta.peer_connection_index);
|
connections.remove(meta.socket_worker_slab_index);
|
||||||
},
|
},
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
eprint!("{}", err);
|
eprint!("{}", err);
|
||||||
|
|
|
||||||
|
|
@ -120,9 +120,23 @@ pub enum InMessage {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
impl InMessage {
|
||||||
|
fn from_ws_message(ws_messge: tungstenite::Message) -> Result<Self, ()> {
|
||||||
|
unimplemented!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
pub enum OutMessage {
|
pub enum OutMessage {
|
||||||
AnnounceResponse(AnnounceResponse),
|
AnnounceResponse(AnnounceResponse),
|
||||||
ScrapeResponse(ScrapeResponse),
|
ScrapeResponse(ScrapeResponse),
|
||||||
Offer(MiddlemanOfferToPeer),
|
Offer(MiddlemanOfferToPeer),
|
||||||
Answer(MiddlemanAnswerToPeer),
|
Answer(MiddlemanAnswerToPeer),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
impl OutMessage {
|
||||||
|
fn to_ws_message(self) -> tungstenite::Message {
|
||||||
|
unimplemented!()
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue