From b30da1a930e9dfc942e1efa154e644a64be158f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 5 Jul 2022 12:03:51 +0200 Subject: [PATCH] ws: store peer_id and announced info hashes in ConnectionReference --- aquatic_ws/src/workers/socket.rs | 64 ++++++++++++++++++++++++++++---- 1 file changed, 56 insertions(+), 8 deletions(-) diff --git a/aquatic_ws/src/workers/socket.rs b/aquatic_ws/src/workers/socket.rs index 42dfb5f..8c04798 100644 --- a/aquatic_ws/src/workers/socket.rs +++ b/aquatic_ws/src/workers/socket.rs @@ -25,7 +25,7 @@ use glommio::net::{TcpListener, TcpStream}; use glommio::task::JoinHandle; use glommio::timer::{sleep, timeout, TimerActionRepeat}; use glommio::{enclose, prelude::*}; -use hashbrown::HashMap; +use hashbrown::{HashMap, HashSet}; use slab::Slab; use crate::config::Config; @@ -46,6 +46,8 @@ struct ConnectionReference { out_message_sender: Rc>, /// Updated after sending message to peer valid_until: ValidUntil, + peer_id: Option, + announced_info_hashes: HashSet, } pub async fn run_socket_worker( @@ -112,6 +114,8 @@ pub async fn run_socket_worker( task_handle: None, out_message_sender: out_message_sender.clone(), valid_until: ValidUntil::new(config.cleaning.max_connection_idle), + peer_id: None, + announced_info_hashes: Default::default(), }); ::log::info!("accepting stream: {}", key); @@ -240,10 +244,11 @@ async fn run_connection( let access_list_cache = create_access_list_cache(&access_list); let reader_handle = spawn_local_into( - enclose!((config, pending_scrape_slab) async move { + enclose!((config, connection_slab, pending_scrape_slab) async move { let mut reader = ConnectionReader { config, access_list_cache, + connection_slab, in_message_senders, out_message_sender, pending_scrape_slab, @@ -289,6 +294,7 @@ async fn run_connection( struct ConnectionReader { config: Rc, access_list_cache: AccessListCache, + connection_slab: Rc>>, in_message_senders: Rc>, out_message_sender: Rc>, pending_scrape_slab: Rc>>, @@ -320,7 +326,7 @@ impl ConnectionReader { Err(err) => { ::log::debug!("Couldn't parse in_message: {:?}", err); - self.send_error_response("Invalid request".into(), None) + self.send_error_response("Invalid request".into(), None, None) .await?; } } @@ -339,6 +345,39 @@ impl ConnectionReader { .load() .allows(self.config.access_list.mode, &info_hash.0) { + { + let mut connection_slab = self.connection_slab.borrow_mut(); + + let connection_reference = connection_slab + .get_mut(self.connection_id.0) + .ok_or_else(|| { + anyhow::anyhow!( + "connection reference {} not found in slab", + self.connection_id.0 + ) + })?; + + match &mut connection_reference.peer_id { + Some(peer_id) if *peer_id != announce_request.peer_id => { + self.send_error_response( + "Only one peer id can be used per connection".into(), + Some(ErrorResponseAction::Announce), + Some(info_hash), + ) + .await?; + return Err(anyhow::anyhow!("Peer used more than one PeerId")); + } + Some(_) => (), + opt_peer_id @ None => { + *opt_peer_id = Some(announce_request.peer_id); + } + } + + connection_reference + .announced_info_hashes + .insert(announce_request.info_hash); + } + let in_message = InMessage::AnnounceRequest(announce_request); let consumer_index = @@ -354,8 +393,12 @@ impl ConnectionReader { .unwrap(); ::log::info!("sent message to swarm worker"); } else { - self.send_error_response("Info hash not allowed".into(), Some(info_hash)) - .await?; + self.send_error_response( + "Info hash not allowed".into(), + Some(ErrorResponseAction::Announce), + Some(info_hash), + ) + .await?; } } InMessage::ScrapeRequest(ScrapeRequest { info_hashes, .. }) => { @@ -364,8 +407,12 @@ impl ConnectionReader { } else { // If request.info_hashes is empty, don't return scrape for all // torrents, even though reference server does it. It is too expensive. - self.send_error_response("Full scrapes are not allowed".into(), None) - .await?; + self.send_error_response( + "Full scrapes are not allowed".into(), + Some(ErrorResponseAction::Scrape), + None, + ) + .await?; return Ok(()); }; @@ -415,10 +462,11 @@ impl ConnectionReader { async fn send_error_response( &self, failure_reason: Cow<'static, str>, + action: Option, info_hash: Option, ) -> anyhow::Result<()> { let out_message = OutMessage::ErrorResponse(ErrorResponse { - action: Some(ErrorResponseAction::Scrape), + action, failure_reason, info_hash, });