WIP: aquatic_ws: rewrite network code, announce working somewhat

This commit is contained in:
Joakim Frostegård 2020-05-10 20:24:04 +02:00
parent 75c8ccd523
commit 003e5f2df9
4 changed files with 139 additions and 91 deletions

View file

@ -86,44 +86,50 @@ pub fn handle_announce_requests(
let valid_until = ValidUntil::new(240); let valid_until = ValidUntil::new(240);
for (sender_meta, request) in requests { for (sender_meta, request) in requests {
let torrent_data = torrents.entry(request.info_hash) let info_hash = request.info_hash;
let peer_id = request.peer_id;
let torrent_data = torrents.entry(info_hash.clone())
.or_default(); .or_default();
let peer_status = PeerStatus::from_event_and_bytes_left( // FIXME: correct to only update when bytes_left is Some?
request.event, if let Some(bytes_left) = request.bytes_left {
request.bytes_left let peer_status = PeerStatus::from_event_and_bytes_left(
); request.event,
bytes_left
);
let peer = Peer { let peer = Peer {
connection_meta: sender_meta, connection_meta: sender_meta,
status: peer_status, status: peer_status,
valid_until, valid_until,
}; };
let opt_removed_peer = match peer_status { let opt_removed_peer = match peer_status {
PeerStatus::Leeching => { PeerStatus::Leeching => {
torrent_data.num_leechers += 1; torrent_data.num_leechers += 1;
torrent_data.peers.insert(request.peer_id, peer) torrent_data.peers.insert(peer_id.clone(), peer)
}, },
PeerStatus::Seeding => { PeerStatus::Seeding => {
torrent_data.num_seeders += 1; torrent_data.num_seeders += 1;
torrent_data.peers.insert(request.peer_id, peer) torrent_data.peers.insert(peer_id.clone(), peer)
}, },
PeerStatus::Stopped => { PeerStatus::Stopped => {
torrent_data.peers.remove(&request.peer_id) torrent_data.peers.remove(&peer_id)
}
};
match opt_removed_peer.map(|peer| peer.status){
Some(PeerStatus::Leeching) => {
torrent_data.num_leechers -= 1;
},
Some(PeerStatus::Seeding) => {
torrent_data.num_seeders -= 1;
},
_ => {}
} }
};
match opt_removed_peer.map(|peer| peer.status){
Some(PeerStatus::Leeching) => {
torrent_data.num_leechers -= 1;
},
Some(PeerStatus::Seeding) => {
torrent_data.num_seeders -= 1;
},
_ => {}
} }
// If peer sent offers, send them on to random peers // If peer sent offers, send them on to random peers
@ -143,8 +149,8 @@ pub fn handle_announce_requests(
for (offer, peer) in offers.into_iter().zip(peers){ for (offer, peer) in offers.into_iter().zip(peers){
let middleman_offer = MiddlemanOfferToPeer { let middleman_offer = MiddlemanOfferToPeer {
info_hash: request.info_hash, info_hash: info_hash.clone(),
peer_id: request.peer_id, peer_id: peer_id.clone(),
offer: offer.offer, offer: offer.offer,
offer_id: offer.offer_id, offer_id: offer.offer_id,
}; };
@ -161,8 +167,8 @@ pub fn handle_announce_requests(
(Some(answer), Some(to_peer_id), Some(offer_id)) => { (Some(answer), Some(to_peer_id), Some(offer_id)) => {
if let Some(to_peer) = torrent_data.peers.get(&to_peer_id){ if let Some(to_peer) = torrent_data.peers.get(&to_peer_id){
let middleman_answer = MiddlemanAnswerToPeer { let middleman_answer = MiddlemanAnswerToPeer {
peer_id: request.peer_id, peer_id: peer_id,
info_hash: request.info_hash, info_hash: info_hash.clone(),
answer, answer,
offer_id, offer_id,
}; };
@ -177,7 +183,7 @@ pub fn handle_announce_requests(
} }
let response = OutMessage::AnnounceResponse(AnnounceResponse { let response = OutMessage::AnnounceResponse(AnnounceResponse {
info_hash: request.info_hash, info_hash: info_hash,
complete: torrent_data.num_seeders, complete: torrent_data.num_seeders,
incomplete: torrent_data.num_leechers, incomplete: torrent_data.num_leechers,
announce_interval: 120, // FIXME: config announce_interval: 120, // FIXME: config

View file

@ -15,7 +15,13 @@ use crate::common::*;
use crate::protocol::*; use crate::protocol::*;
pub enum Connection { pub struct Connection {
valid_until: Option<ValidUntil>,
stage: ConnectionStage,
}
pub enum ConnectionStage {
Stream(TcpStream), Stream(TcpStream),
MidHandshake(MidHandshake<ServerHandshake<TcpStream, DebugCallback>>), MidHandshake(MidHandshake<ServerHandshake<TcpStream, DebugCallback>>),
Established(PeerConnection), Established(PeerConnection),
@ -49,8 +55,13 @@ pub fn run_socket_worker(
let mut connections: IndexMap<usize, Connection> = IndexMap::new(); let mut connections: IndexMap<usize, Connection> = IndexMap::new();
let placeholder = Connection {
valid_until: None,
stage: ConnectionStage::Placeholder,
};
// Insert empty first entry to prevent assignment of index 0 // Insert empty first entry to prevent assignment of index 0
assert_eq!(connections.insert_full(0, Connection::Placeholder).0, 0); assert_eq!(connections.insert_full(0, placeholder).0, 0);
loop { loop {
poll.poll(&mut events, Some(timeout)) poll.poll(&mut events, Some(timeout))
@ -139,7 +150,12 @@ fn accept_new_streams(
.register(&mut stream, token, Interest::READABLE) .register(&mut stream, token, Interest::READABLE)
.unwrap(); .unwrap();
connections.insert(token.0, Connection::Stream(stream)); let connection = Connection {
valid_until: Some(valid_until),
stage: ConnectionStage::Stream(stream)
};
connections.insert(token.0, connection);
}, },
Err(err) => { Err(err) => {
if err.kind() == ErrorKind::WouldBlock { if err.kind() == ErrorKind::WouldBlock {
@ -162,6 +178,7 @@ impl ::tungstenite::handshake::server::Callback for DebugCallback {
response: ::tungstenite::handshake::server::Response, response: ::tungstenite::handshake::server::Response,
) -> Result<::tungstenite::handshake::server::Response, ::tungstenite::handshake::server::ErrorResponse> { ) -> Result<::tungstenite::handshake::server::Response, ::tungstenite::handshake::server::ErrorResponse> {
println!("request: {:#?}", request); println!("request: {:#?}", request);
println!("response: {:#?}", response);
Ok(response) Ok(response)
} }
@ -179,19 +196,19 @@ pub fn read_and_forward_in_messages(
println!("poll_token: {}", poll_token.0); println!("poll_token: {}", poll_token.0);
loop { loop {
let established = match connections.get_index(poll_token.0){ let established = match connections.get_index(poll_token.0).map(|(_, v)| &v.stage){
Some((_, Connection::Stream(_))) => false, Some(ConnectionStage::Stream(_)) => false,
Some((_, Connection::MidHandshake(_))) => false, Some(ConnectionStage::MidHandshake(_)) => false,
Some((_, Connection::Established(_))) => true, Some(ConnectionStage::Established(_)) => true,
Some((_, Connection::Placeholder)) => unreachable!(), Some(ConnectionStage::Placeholder) => unreachable!(),
None => break, None => break,
}; };
if !established { if !established {
let conn = connections.remove(&poll_token.0).unwrap(); let conn = connections.remove(&poll_token.0).unwrap();
match conn { match conn.stage {
Connection::Stream(stream) => { ConnectionStage::Stream(stream) => {
let peer_socket_addr = stream.peer_addr().unwrap(); let peer_socket_addr = stream.peer_addr().unwrap();
match ::tungstenite::server::accept_hdr(stream, DebugCallback){ match ::tungstenite::server::accept_hdr(stream, DebugCallback){
@ -202,21 +219,32 @@ pub fn read_and_forward_in_messages(
peer_socket_addr, peer_socket_addr,
valid_until, valid_until,
}; };
let connection = Connection {
valid_until: Some(valid_until),
stage: ConnectionStage::Established(peer_connection)
};
connections.insert(poll_token.0, Connection::Established(peer_connection)); connections.insert(poll_token.0, connection);
}, },
Err(HandshakeError::Interrupted(handshake)) => { Err(HandshakeError::Interrupted(handshake)) => {
println!("interrupted"); println!("interrupted");
connections.insert(poll_token.0, Connection::MidHandshake(handshake));
let connection = Connection {
valid_until: Some(valid_until),
stage: ConnectionStage::MidHandshake(handshake),
};
connections.insert(poll_token.0, connection);
break; break;
}, },
Err(HandshakeError::Failure(err)) => { Err(HandshakeError::Failure(err)) => {
eprintln!("handshake: {}", err) dbg!(err);
} }
} }
}, },
Connection::MidHandshake(mut handshake) => { ConnectionStage::MidHandshake(mut handshake) => {
let stream = handshake.get_mut().get_mut(); let stream = handshake.get_mut().get_mut();
let peer_socket_addr = stream.peer_addr().unwrap(); let peer_socket_addr = stream.peer_addr().unwrap();
@ -228,25 +256,41 @@ pub fn read_and_forward_in_messages(
peer_socket_addr, peer_socket_addr,
valid_until, valid_until,
}; };
let connection = Connection {
valid_until: Some(valid_until),
stage: ConnectionStage::Established(peer_connection)
};
connections.insert(poll_token.0, Connection::Established(peer_connection)); connections.insert(poll_token.0, connection);
}, },
Err(HandshakeError::Interrupted(handshake)) => { Err(HandshakeError::Interrupted(handshake)) => {
connections.insert(poll_token.0, Connection::MidHandshake(handshake)); let connection = Connection {
valid_until: Some(valid_until),
stage: ConnectionStage::MidHandshake(handshake),
};
connections.insert(poll_token.0, connection);
break; break;
}, },
Err(err) => eprintln!("handshake: {}", err), Err(err) => {
dbg!(err);
},
} }
}, },
_ => unreachable!(), _ => unreachable!(),
} }
} else if let Some(Connection::Established(connection)) = connections.get_mut(&poll_token.0){ } else if let Some(Connection{ stage: ConnectionStage::Established(connection), ..}) = connections.get_mut(&poll_token.0){
println!("conn established"); println!("conn established");
match connection.ws.read_message(){ match connection.ws.read_message(){
Ok(ws_message) => { Ok(ws_message) => {
dbg!(ws_message.clone());
if let Some(in_message) = InMessage::from_ws_message(ws_message){ if let Some(in_message) = InMessage::from_ws_message(ws_message){
dbg!(in_message.clone());
let meta = ConnectionMeta { let meta = ConnectionMeta {
socket_worker_index, socket_worker_index,
socket_worker_slab_index: poll_token.0, socket_worker_slab_index: poll_token.0,
@ -290,15 +334,18 @@ pub fn send_out_messages(
// Read messages from channel, send to peers // Read messages from channel, send to peers
for (meta, out_message) in out_message_receiver { for (meta, out_message) in out_message_receiver {
let opt_connection = connections let opt_connection = connections
.get_mut(&meta.socket_worker_slab_index); .get_mut(&meta.socket_worker_slab_index)
.map(|v| &mut v.stage);
if let Some(Connection::Established(connection)) = opt_connection { if let Some(ConnectionStage::Established(connection)) = opt_connection {
if connection.peer_socket_addr != meta.peer_socket_addr { if connection.peer_socket_addr != meta.peer_socket_addr {
eprintln!("socket worker: peer socket addrs didn't match"); eprintln!("socket worker: peer socket addrs didn't match");
continue; continue;
} }
dbg!(out_message.clone());
match connection.ws.write_message(out_message.to_ws_message()){ match connection.ws.write_message(out_message.to_ws_message()){
Ok(()) => {}, Ok(()) => {},
Err(tungstenite::Error::Io(err)) => { Err(tungstenite::Error::Io(err)) => {

View file

@ -6,7 +6,7 @@ use super::InfoHash;
struct TwentyByteVisitor; struct TwentyByteVisitor;
impl<'de> Visitor<'de> for TwentyByteVisitor { impl<'de> Visitor<'de> for TwentyByteVisitor {
type Value = [u8; 20]; type Value = String;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("string consisting of 20 bytes") formatter.write_str("string consisting of 20 bytes")
@ -15,24 +15,19 @@ impl<'de> Visitor<'de> for TwentyByteVisitor {
fn visit_str<E>(self, value: &str) -> Result<Self::Value, E> fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
where E: ::serde::de::Error, where E: ::serde::de::Error,
{ {
let mut arr = [0u8; 20]; if value.chars().count() == 20 { // FIXME
Ok(value.to_string())
let bytes = value.as_bytes();
if bytes.len() == 20 {
arr.copy_from_slice(&bytes);
Ok(arr)
} else { } else {
Err(E::custom(format!("not 20 bytes: {}", value))) Err(E::custom(format!("not 20 bytes: {}", value)))
} }
} }
} }
pub fn deserialize_20_bytes<'de, D>( pub fn deserialize_20_bytes<'de, D>(
deserializer: D deserializer: D
) -> Result<[u8; 20], D::Error> ) -> Result<String, D::Error>
where D: Deserializer<'de> where D: Deserializer<'de>
{ {
deserializer.deserialize_any(TwentyByteVisitor) deserializer.deserialize_any(TwentyByteVisitor)
@ -100,11 +95,7 @@ mod tests {
use super::*; use super::*;
fn info_hash_from_bytes(bytes: &[u8]) -> InfoHash { fn info_hash_from_bytes(bytes: &[u8]) -> InfoHash {
let mut arr = [0u8; 20]; InfoHash(String::from_utf8_lossy(bytes).to_string())
arr.copy_from_slice(bytes);
InfoHash(arr)
} }
#[test] #[test]

View file

@ -6,37 +6,37 @@ pub mod deserialize;
use deserialize::*; use deserialize::*;
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
#[serde(transparent)] #[serde(transparent)]
pub struct PeerId( pub struct PeerId(
#[serde(deserialize_with = "deserialize_20_bytes")] #[serde(deserialize_with = "deserialize_20_bytes")]
pub [u8; 20] pub String
); );
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
#[serde(transparent)] #[serde(transparent)]
pub struct InfoHash( pub struct InfoHash(
#[serde(deserialize_with = "deserialize_20_bytes")] #[serde(deserialize_with = "deserialize_20_bytes")]
pub [u8; 20] pub String
); );
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(transparent)] #[serde(transparent)]
pub struct OfferId( pub struct OfferId(
#[serde(deserialize_with = "deserialize_20_bytes")] #[serde(deserialize_with = "deserialize_20_bytes")]
pub [u8; 20] pub String
); );
/// Some kind of nested structure from https://www.npmjs.com/package/simple-peer /// Some kind of nested structure from https://www.npmjs.com/package/simple-peer
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(transparent)] #[serde(transparent)]
pub struct JsonValue(pub ::serde_json::Value); pub struct JsonValue(pub ::serde_json::Value);
#[derive(Clone, Deserialize)] #[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "lowercase")] #[serde(rename_all = "lowercase")]
pub enum AnnounceEvent { pub enum AnnounceEvent {
Started, Started,
@ -56,7 +56,7 @@ impl Default for AnnounceEvent {
/// Apparently, these are sent to a number of peers when they are set /// Apparently, these are sent to a number of peers when they are set
/// in an AnnounceRequest /// in an AnnounceRequest
/// action = "announce" /// action = "announce"
#[derive(Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
pub struct MiddlemanOfferToPeer { pub struct MiddlemanOfferToPeer {
/// Peer id of peer sending offer /// Peer id of peer sending offer
/// Note: if equal to client peer_id, client ignores offer /// Note: if equal to client peer_id, client ignores offer
@ -72,7 +72,7 @@ pub struct MiddlemanOfferToPeer {
/// If announce request has answer = true, send this to peer with /// If announce request has answer = true, send this to peer with
/// peer id == "to_peer_id" field /// peer id == "to_peer_id" field
/// Action field should be 'announce' /// Action field should be 'announce'
#[derive(Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
pub struct MiddlemanAnswerToPeer { pub struct MiddlemanAnswerToPeer {
/// Note: if equal to client peer_id, client ignores answer /// Note: if equal to client peer_id, client ignores answer
pub peer_id: PeerId, pub peer_id: PeerId,
@ -83,20 +83,20 @@ pub struct MiddlemanAnswerToPeer {
/// Element of AnnounceRequest.offers /// Element of AnnounceRequest.offers
#[derive(Clone, Deserialize)] #[derive(Debug, Clone, Deserialize)]
pub struct AnnounceRequestOffer { pub struct AnnounceRequestOffer {
pub offer: JsonValue, pub offer: JsonValue,
pub offer_id: OfferId, pub offer_id: OfferId,
} }
#[derive(Clone, Deserialize)] #[derive(Debug, Clone, Deserialize)]
pub struct AnnounceRequest { pub struct AnnounceRequest {
pub info_hash: InfoHash, pub info_hash: InfoHash,
pub peer_id: PeerId, pub peer_id: PeerId,
/// Just called "left" in protocol /// Just called "left" in protocol
#[serde(rename = "left")] #[serde(rename = "left")]
pub bytes_left: usize, // FIXME: I had this set as bool before, check! pub bytes_left: Option<usize>, // FIXME: I had this set as bool before, check!
/// Can be empty. Then, default is "update" /// Can be empty. Then, default is "update"
#[serde(default)] #[serde(default)]
pub event: AnnounceEvent, pub event: AnnounceEvent,
@ -124,7 +124,7 @@ pub struct AnnounceRequest {
} }
#[derive(Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
pub struct AnnounceResponse { pub struct AnnounceResponse {
pub info_hash: InfoHash, pub info_hash: InfoHash,
/// Client checks if this is null, not clear why /// Client checks if this is null, not clear why
@ -135,7 +135,7 @@ pub struct AnnounceResponse {
} }
#[derive(Clone, Deserialize)] #[derive(Debug, Clone, Deserialize)]
pub struct ScrapeRequest { pub struct ScrapeRequest {
// If omitted, scrape for all torrents, apparently // If omitted, scrape for all torrents, apparently
// There is some kind of parsing here too which accepts a single info hash // There is some kind of parsing here too which accepts a single info hash
@ -149,7 +149,7 @@ pub struct ScrapeRequest {
} }
#[derive(Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
pub struct ScrapeStatistics { pub struct ScrapeStatistics {
pub complete: usize, pub complete: usize,
pub incomplete: usize, pub incomplete: usize,
@ -157,7 +157,7 @@ pub struct ScrapeStatistics {
} }
#[derive(Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
pub struct ScrapeResponse { pub struct ScrapeResponse {
pub files: HashMap<InfoHash, ScrapeStatistics>, pub files: HashMap<InfoHash, ScrapeStatistics>,
// Looks like `flags` field is ignored in reference client // Looks like `flags` field is ignored in reference client
@ -165,7 +165,7 @@ pub struct ScrapeResponse {
} }
#[derive(Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")] #[serde(rename_all = "lowercase")]
pub enum Action { pub enum Action {
Announce, Announce,
@ -174,7 +174,7 @@ pub enum Action {
/// Helper for serializing and deserializing messages /// Helper for serializing and deserializing messages
#[derive(Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
struct ActionWrapper<T> { struct ActionWrapper<T> {
pub action: Action, pub action: Action,
#[serde(flatten)] #[serde(flatten)]
@ -198,6 +198,7 @@ impl <T>ActionWrapper<T> {
} }
#[derive(Debug, Clone)]
pub enum InMessage { pub enum InMessage {
AnnounceRequest(AnnounceRequest), AnnounceRequest(AnnounceRequest),
ScrapeRequest(ScrapeRequest), ScrapeRequest(ScrapeRequest),
@ -220,6 +221,8 @@ impl InMessage {
if let Ok(ActionWrapper { action: Action::Announce, inner }) = res { if let Ok(ActionWrapper { action: Action::Announce, inner }) = res {
return Some(InMessage::AnnounceRequest(inner)); return Some(InMessage::AnnounceRequest(inner));
} else {
dbg!(res);
} }
let res: Result<ActionWrapper<ScrapeRequest>, _> = serde_json::from_str(&text); let res: Result<ActionWrapper<ScrapeRequest>, _> = serde_json::from_str(&text);
@ -233,6 +236,7 @@ impl InMessage {
} }
#[derive(Debug, Clone)]
pub enum OutMessage { pub enum OutMessage {
AnnounceResponse(AnnounceResponse), AnnounceResponse(AnnounceResponse),
ScrapeResponse(ScrapeResponse), ScrapeResponse(ScrapeResponse),