diff --git a/aquatic_ws/src/lib/handler.rs b/aquatic_ws/src/lib/handler.rs index 2e2f8e8..c01b131 100644 --- a/aquatic_ws/src/lib/handler.rs +++ b/aquatic_ws/src/lib/handler.rs @@ -86,44 +86,50 @@ pub fn handle_announce_requests( let valid_until = ValidUntil::new(240); for (sender_meta, request) in requests { - let torrent_data = torrents.entry(request.info_hash) + let info_hash = request.info_hash; + let peer_id = request.peer_id; + + let torrent_data = torrents.entry(info_hash.clone()) .or_default(); - let peer_status = PeerStatus::from_event_and_bytes_left( - request.event, - request.bytes_left - ); + // FIXME: correct to only update when bytes_left is Some? + if let Some(bytes_left) = request.bytes_left { + let peer_status = PeerStatus::from_event_and_bytes_left( + request.event, + bytes_left + ); - let peer = Peer { - connection_meta: sender_meta, - status: peer_status, - valid_until, - }; - - let opt_removed_peer = match peer_status { - PeerStatus::Leeching => { - torrent_data.num_leechers += 1; + let peer = Peer { + connection_meta: sender_meta, + status: peer_status, + valid_until, + }; + + let opt_removed_peer = match peer_status { + PeerStatus::Leeching => { + torrent_data.num_leechers += 1; - torrent_data.peers.insert(request.peer_id, peer) - }, - PeerStatus::Seeding => { - torrent_data.num_seeders += 1; + torrent_data.peers.insert(peer_id.clone(), peer) + }, + PeerStatus::Seeding => { + torrent_data.num_seeders += 1; - torrent_data.peers.insert(request.peer_id, peer) - }, - PeerStatus::Stopped => { - torrent_data.peers.remove(&request.peer_id) + torrent_data.peers.insert(peer_id.clone(), peer) + }, + PeerStatus::Stopped => { + torrent_data.peers.remove(&peer_id) + } + }; + + match opt_removed_peer.map(|peer| peer.status){ + Some(PeerStatus::Leeching) => { + torrent_data.num_leechers -= 1; + }, + Some(PeerStatus::Seeding) => { + torrent_data.num_seeders -= 1; + }, + _ => {} } - }; - - match opt_removed_peer.map(|peer| peer.status){ - Some(PeerStatus::Leeching) => { - torrent_data.num_leechers -= 1; - }, - Some(PeerStatus::Seeding) => { - torrent_data.num_seeders -= 1; - }, - _ => {} } // If peer sent offers, send them on to random peers @@ -143,8 +149,8 @@ pub fn handle_announce_requests( for (offer, peer) in offers.into_iter().zip(peers){ let middleman_offer = MiddlemanOfferToPeer { - info_hash: request.info_hash, - peer_id: request.peer_id, + info_hash: info_hash.clone(), + peer_id: peer_id.clone(), offer: offer.offer, offer_id: offer.offer_id, }; @@ -161,8 +167,8 @@ pub fn handle_announce_requests( (Some(answer), Some(to_peer_id), Some(offer_id)) => { if let Some(to_peer) = torrent_data.peers.get(&to_peer_id){ let middleman_answer = MiddlemanAnswerToPeer { - peer_id: request.peer_id, - info_hash: request.info_hash, + peer_id: peer_id, + info_hash: info_hash.clone(), answer, offer_id, }; @@ -177,7 +183,7 @@ pub fn handle_announce_requests( } let response = OutMessage::AnnounceResponse(AnnounceResponse { - info_hash: request.info_hash, + info_hash: info_hash, complete: torrent_data.num_seeders, incomplete: torrent_data.num_leechers, announce_interval: 120, // FIXME: config diff --git a/aquatic_ws/src/lib/network.rs b/aquatic_ws/src/lib/network.rs index 620f21c..f3c660c 100644 --- a/aquatic_ws/src/lib/network.rs +++ b/aquatic_ws/src/lib/network.rs @@ -15,7 +15,13 @@ use crate::common::*; use crate::protocol::*; -pub enum Connection { +pub struct Connection { + valid_until: Option, + stage: ConnectionStage, +} + + +pub enum ConnectionStage { Stream(TcpStream), MidHandshake(MidHandshake>), Established(PeerConnection), @@ -49,8 +55,13 @@ pub fn run_socket_worker( let mut connections: IndexMap = IndexMap::new(); + let placeholder = Connection { + valid_until: None, + stage: ConnectionStage::Placeholder, + }; + // Insert empty first entry to prevent assignment of index 0 - assert_eq!(connections.insert_full(0, Connection::Placeholder).0, 0); + assert_eq!(connections.insert_full(0, placeholder).0, 0); loop { poll.poll(&mut events, Some(timeout)) @@ -139,7 +150,12 @@ fn accept_new_streams( .register(&mut stream, token, Interest::READABLE) .unwrap(); - connections.insert(token.0, Connection::Stream(stream)); + let connection = Connection { + valid_until: Some(valid_until), + stage: ConnectionStage::Stream(stream) + }; + + connections.insert(token.0, connection); }, Err(err) => { if err.kind() == ErrorKind::WouldBlock { @@ -162,6 +178,7 @@ impl ::tungstenite::handshake::server::Callback for DebugCallback { response: ::tungstenite::handshake::server::Response, ) -> Result<::tungstenite::handshake::server::Response, ::tungstenite::handshake::server::ErrorResponse> { println!("request: {:#?}", request); + println!("response: {:#?}", response); Ok(response) } @@ -179,19 +196,19 @@ pub fn read_and_forward_in_messages( println!("poll_token: {}", poll_token.0); loop { - let established = match connections.get_index(poll_token.0){ - Some((_, Connection::Stream(_))) => false, - Some((_, Connection::MidHandshake(_))) => false, - Some((_, Connection::Established(_))) => true, - Some((_, Connection::Placeholder)) => unreachable!(), + let established = match connections.get_index(poll_token.0).map(|(_, v)| &v.stage){ + Some(ConnectionStage::Stream(_)) => false, + Some(ConnectionStage::MidHandshake(_)) => false, + Some(ConnectionStage::Established(_)) => true, + Some(ConnectionStage::Placeholder) => unreachable!(), None => break, }; if !established { let conn = connections.remove(&poll_token.0).unwrap(); - match conn { - Connection::Stream(stream) => { + match conn.stage { + ConnectionStage::Stream(stream) => { let peer_socket_addr = stream.peer_addr().unwrap(); match ::tungstenite::server::accept_hdr(stream, DebugCallback){ @@ -202,21 +219,32 @@ pub fn read_and_forward_in_messages( peer_socket_addr, valid_until, }; + + let connection = Connection { + valid_until: Some(valid_until), + stage: ConnectionStage::Established(peer_connection) + }; - connections.insert(poll_token.0, Connection::Established(peer_connection)); + connections.insert(poll_token.0, connection); }, Err(HandshakeError::Interrupted(handshake)) => { println!("interrupted"); - connections.insert(poll_token.0, Connection::MidHandshake(handshake)); + + let connection = Connection { + valid_until: Some(valid_until), + stage: ConnectionStage::MidHandshake(handshake), + }; + + connections.insert(poll_token.0, connection); break; }, Err(HandshakeError::Failure(err)) => { - eprintln!("handshake: {}", err) + dbg!(err); } } }, - Connection::MidHandshake(mut handshake) => { + ConnectionStage::MidHandshake(mut handshake) => { let stream = handshake.get_mut().get_mut(); let peer_socket_addr = stream.peer_addr().unwrap(); @@ -228,25 +256,41 @@ pub fn read_and_forward_in_messages( peer_socket_addr, valid_until, }; + + let connection = Connection { + valid_until: Some(valid_until), + stage: ConnectionStage::Established(peer_connection) + }; - connections.insert(poll_token.0, Connection::Established(peer_connection)); + connections.insert(poll_token.0, connection); }, Err(HandshakeError::Interrupted(handshake)) => { - connections.insert(poll_token.0, Connection::MidHandshake(handshake)); + let connection = Connection { + valid_until: Some(valid_until), + stage: ConnectionStage::MidHandshake(handshake), + }; + + connections.insert(poll_token.0, connection); break; }, - Err(err) => eprintln!("handshake: {}", err), + Err(err) => { + dbg!(err); + }, } }, _ => unreachable!(), } - } else if let Some(Connection::Established(connection)) = connections.get_mut(&poll_token.0){ + } else if let Some(Connection{ stage: ConnectionStage::Established(connection), ..}) = connections.get_mut(&poll_token.0){ println!("conn established"); match connection.ws.read_message(){ Ok(ws_message) => { + dbg!(ws_message.clone()); + if let Some(in_message) = InMessage::from_ws_message(ws_message){ + dbg!(in_message.clone()); + let meta = ConnectionMeta { socket_worker_index, socket_worker_slab_index: poll_token.0, @@ -290,15 +334,18 @@ pub fn send_out_messages( // Read messages from channel, send to peers for (meta, out_message) in out_message_receiver { let opt_connection = connections - .get_mut(&meta.socket_worker_slab_index); + .get_mut(&meta.socket_worker_slab_index) + .map(|v| &mut v.stage); - if let Some(Connection::Established(connection)) = opt_connection { + if let Some(ConnectionStage::Established(connection)) = opt_connection { if connection.peer_socket_addr != meta.peer_socket_addr { eprintln!("socket worker: peer socket addrs didn't match"); continue; } + dbg!(out_message.clone()); + match connection.ws.write_message(out_message.to_ws_message()){ Ok(()) => {}, Err(tungstenite::Error::Io(err)) => { diff --git a/aquatic_ws/src/lib/protocol/deserialize.rs b/aquatic_ws/src/lib/protocol/deserialize.rs index 41b276e..df312be 100644 --- a/aquatic_ws/src/lib/protocol/deserialize.rs +++ b/aquatic_ws/src/lib/protocol/deserialize.rs @@ -6,7 +6,7 @@ use super::InfoHash; struct TwentyByteVisitor; impl<'de> Visitor<'de> for TwentyByteVisitor { - type Value = [u8; 20]; + type Value = String; fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { formatter.write_str("string consisting of 20 bytes") @@ -15,24 +15,19 @@ impl<'de> Visitor<'de> for TwentyByteVisitor { fn visit_str(self, value: &str) -> Result where E: ::serde::de::Error, { - let mut arr = [0u8; 20]; - - let bytes = value.as_bytes(); - - if bytes.len() == 20 { - arr.copy_from_slice(&bytes); - - Ok(arr) + if value.chars().count() == 20 { // FIXME + Ok(value.to_string()) } else { Err(E::custom(format!("not 20 bytes: {}", value))) } + } } pub fn deserialize_20_bytes<'de, D>( deserializer: D -) -> Result<[u8; 20], D::Error> +) -> Result where D: Deserializer<'de> { deserializer.deserialize_any(TwentyByteVisitor) @@ -100,11 +95,7 @@ mod tests { use super::*; fn info_hash_from_bytes(bytes: &[u8]) -> InfoHash { - let mut arr = [0u8; 20]; - - arr.copy_from_slice(bytes); - - InfoHash(arr) + InfoHash(String::from_utf8_lossy(bytes).to_string()) } #[test] diff --git a/aquatic_ws/src/lib/protocol/mod.rs b/aquatic_ws/src/lib/protocol/mod.rs index 0741df4..7ca46ee 100644 --- a/aquatic_ws/src/lib/protocol/mod.rs +++ b/aquatic_ws/src/lib/protocol/mod.rs @@ -6,37 +6,37 @@ pub mod deserialize; use deserialize::*; -#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)] #[serde(transparent)] pub struct PeerId( #[serde(deserialize_with = "deserialize_20_bytes")] - pub [u8; 20] + pub String ); -#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)] #[serde(transparent)] pub struct InfoHash( #[serde(deserialize_with = "deserialize_20_bytes")] - pub [u8; 20] + pub String ); -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(transparent)] pub struct OfferId( #[serde(deserialize_with = "deserialize_20_bytes")] - pub [u8; 20] + pub String ); /// Some kind of nested structure from https://www.npmjs.com/package/simple-peer -#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(transparent)] pub struct JsonValue(pub ::serde_json::Value); -#[derive(Clone, Deserialize)] +#[derive(Debug, Clone, Deserialize)] #[serde(rename_all = "lowercase")] pub enum AnnounceEvent { Started, @@ -56,7 +56,7 @@ impl Default for AnnounceEvent { /// Apparently, these are sent to a number of peers when they are set /// in an AnnounceRequest /// action = "announce" -#[derive(Clone, Serialize)] +#[derive(Debug, Clone, Serialize)] pub struct MiddlemanOfferToPeer { /// Peer id of peer sending offer /// Note: if equal to client peer_id, client ignores offer @@ -72,7 +72,7 @@ pub struct MiddlemanOfferToPeer { /// If announce request has answer = true, send this to peer with /// peer id == "to_peer_id" field /// Action field should be 'announce' -#[derive(Clone, Serialize)] +#[derive(Debug, Clone, Serialize)] pub struct MiddlemanAnswerToPeer { /// Note: if equal to client peer_id, client ignores answer pub peer_id: PeerId, @@ -83,20 +83,20 @@ pub struct MiddlemanAnswerToPeer { /// Element of AnnounceRequest.offers -#[derive(Clone, Deserialize)] +#[derive(Debug, Clone, Deserialize)] pub struct AnnounceRequestOffer { pub offer: JsonValue, pub offer_id: OfferId, } -#[derive(Clone, Deserialize)] +#[derive(Debug, Clone, Deserialize)] pub struct AnnounceRequest { pub info_hash: InfoHash, pub peer_id: PeerId, /// Just called "left" in protocol #[serde(rename = "left")] - pub bytes_left: usize, // FIXME: I had this set as bool before, check! + pub bytes_left: Option, // FIXME: I had this set as bool before, check! /// Can be empty. Then, default is "update" #[serde(default)] pub event: AnnounceEvent, @@ -124,7 +124,7 @@ pub struct AnnounceRequest { } -#[derive(Clone, Serialize)] +#[derive(Debug, Clone, Serialize)] pub struct AnnounceResponse { pub info_hash: InfoHash, /// Client checks if this is null, not clear why @@ -135,7 +135,7 @@ pub struct AnnounceResponse { } -#[derive(Clone, Deserialize)] +#[derive(Debug, Clone, Deserialize)] pub struct ScrapeRequest { // If omitted, scrape for all torrents, apparently // There is some kind of parsing here too which accepts a single info hash @@ -149,7 +149,7 @@ pub struct ScrapeRequest { } -#[derive(Clone, Serialize)] +#[derive(Debug, Clone, Serialize)] pub struct ScrapeStatistics { pub complete: usize, pub incomplete: usize, @@ -157,7 +157,7 @@ pub struct ScrapeStatistics { } -#[derive(Clone, Serialize)] +#[derive(Debug, Clone, Serialize)] pub struct ScrapeResponse { pub files: HashMap, // Looks like `flags` field is ignored in reference client @@ -165,7 +165,7 @@ pub struct ScrapeResponse { } -#[derive(Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] pub enum Action { Announce, @@ -174,7 +174,7 @@ pub enum Action { /// Helper for serializing and deserializing messages -#[derive(Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] struct ActionWrapper { pub action: Action, #[serde(flatten)] @@ -198,6 +198,7 @@ impl ActionWrapper { } +#[derive(Debug, Clone)] pub enum InMessage { AnnounceRequest(AnnounceRequest), ScrapeRequest(ScrapeRequest), @@ -220,6 +221,8 @@ impl InMessage { if let Ok(ActionWrapper { action: Action::Announce, inner }) = res { return Some(InMessage::AnnounceRequest(inner)); + } else { + dbg!(res); } let res: Result, _> = serde_json::from_str(&text); @@ -233,6 +236,7 @@ impl InMessage { } +#[derive(Debug, Clone)] pub enum OutMessage { AnnounceResponse(AnnounceResponse), ScrapeResponse(ScrapeResponse),