diff --git a/aquatic_http/src/lib/handler.rs b/aquatic_http/src/lib/handler.rs index 7697ff5..3ff37be 100644 --- a/aquatic_http/src/lib/handler.rs +++ b/aquatic_http/src/lib/handler.rs @@ -156,7 +156,7 @@ pub fn handle_announce_requests( ResponsePeer::from_peer ); - let response = Response::AnnounceSuccess(AnnounceResponseSuccess { + let response = Response::Announce(AnnounceResponse { complete: torrent_data.num_seeders, incomplete: torrent_data.num_leechers, announce_interval: config.protocol.peer_announce_interval, diff --git a/aquatic_http/src/lib/network/mod.rs b/aquatic_http/src/lib/network/mod.rs index 6cd9e70..da8f4c8 100644 --- a/aquatic_http/src/lib/network/mod.rs +++ b/aquatic_http/src/lib/network/mod.rs @@ -3,6 +3,7 @@ pub mod connection; use std::time::{Duration, Instant}; use std::io::ErrorKind; use std::sync::Arc; +use std::vec::Drain; use hashbrown::HashMap; use log::{info, debug, error}; @@ -75,6 +76,8 @@ pub fn run_poll_loop( let mut poll_token_counter = Token(0usize); let mut iter_counter = 0usize; + let mut local_responses = Vec::new(); + loop { poll.poll(&mut events, Some(poll_timeout)) .expect("failed polling"); @@ -97,6 +100,7 @@ pub fn run_poll_loop( run_handshakes_and_read_requests( socket_worker_index, &request_channel_sender, + &mut local_responses, &mut connections, token, valid_until, @@ -105,6 +109,7 @@ pub fn run_poll_loop( } send_responses( + local_responses.drain(..), response_channel_receiver.drain(), &mut connections ); @@ -166,6 +171,7 @@ fn accept_new_streams( pub fn run_handshakes_and_read_requests( socket_worker_index: usize, request_channel_sender: &RequestChannelSender, + local_responses: &mut Vec<(ConnectionMeta, Response)>, connections: &mut ConnectionMap, poll_token: Token, valid_until: ValidUntil, @@ -209,6 +215,25 @@ pub fn run_handshakes_and_read_requests( // Stop reading data (defer to later events) break; }, + Err(RequestReadError::Invalid(err)) => { + info!("error reading request (invalid): {}", err); + + let meta = ConnectionMeta { + worker_index: socket_worker_index, + poll_token, + peer_addr: established.peer_addr + }; + + let response = FailureResponse { + failure_reason: "invalid request".to_string() + }; + + local_responses.push( + (meta, Response::Failure(response)) + ); + + break; + }, Err(err) => { info!("error reading request: {:?}", err); @@ -243,10 +268,11 @@ pub fn run_handshakes_and_read_requests( /// Read responses from channel, send to peers pub fn send_responses( + local_responses: Drain<(ConnectionMeta, Response)>, response_channel_receiver: ::flume::Drain<(ConnectionMeta, Response)>, connections: &mut ConnectionMap, ){ - for (meta, response) in response_channel_receiver { + for (meta, response) in local_responses.chain(response_channel_receiver){ if let Some(established) = connections.get_mut(&meta.poll_token) .and_then(|c| c.inner.as_mut().left()) { diff --git a/aquatic_http/src/lib/protocol/response.rs b/aquatic_http/src/lib/protocol/response.rs index b844e6b..039696f 100644 --- a/aquatic_http/src/lib/protocol/response.rs +++ b/aquatic_http/src/lib/protocol/response.rs @@ -37,7 +37,7 @@ pub struct ScrapeStatistics { #[derive(Debug, Clone, Serialize)] -pub struct AnnounceResponseSuccess { +pub struct AnnounceResponse { #[serde(rename = "interval")] pub announce_interval: usize, pub tracker_id: String, // Optional?? @@ -50,24 +50,24 @@ pub struct AnnounceResponseSuccess { } -#[derive(Debug, Clone, Serialize)] -pub struct AnnounceResponseFailure { - pub failure_reason: String, -} - - #[derive(Debug, Clone, Serialize)] pub struct ScrapeResponse { pub files: HashMap, } +#[derive(Debug, Clone, Serialize)] +pub struct FailureResponse { + pub failure_reason: String, +} + + #[derive(Debug, Clone, Serialize)] #[serde(untagged)] pub enum Response { - AnnounceSuccess(AnnounceResponseSuccess), - AnnounceFailure(AnnounceResponseFailure), - Scrape(ScrapeResponse) + Announce(AnnounceResponse), + Scrape(ScrapeResponse), + Failure(FailureResponse), }