diff --git a/aquatic_udp/src/workers/socket/mod.rs b/aquatic_udp/src/workers/socket/mod.rs index 2c196e8..72490d4 100644 --- a/aquatic_udp/src/workers/socket/mod.rs +++ b/aquatic_udp/src/workers/socket/mod.rs @@ -100,7 +100,7 @@ impl SocketWorker { for event in events.iter() { if event.is_readable() { - self.read_requests(&mut local_responses, pending_scrape_valid_until); + self.read_and_handle_requests(&mut local_responses, pending_scrape_valid_until); } } @@ -175,7 +175,7 @@ impl SocketWorker { } } - fn read_requests( + fn read_and_handle_requests( &mut self, local_responses: &mut Vec<(Response, CanonicalSocketAddr)>, pending_scrape_valid_until: ValidUntil, @@ -194,32 +194,57 @@ impl SocketWorker { continue; } - let res_request = Request::from_bytes( + let src = CanonicalSocketAddr::new(src); + + let request_parsable = match Request::from_bytes( &self.buffer[..bytes_read], self.config.protocol.max_scrape_torrents, - ); + ) { + Ok(request) => { + self.handle_request( + local_responses, + pending_scrape_valid_until, + request, + src, + ); - let src = CanonicalSocketAddr::new(src); + true + } + Err(err) => { + ::log::debug!("Request::from_bytes error: {:?}", err); + + if let RequestParseError::Sendable { + connection_id, + transaction_id, + err, + } = err + { + if self.validator.connection_id_valid(src, connection_id) { + let response = ErrorResponse { + transaction_id, + message: err.right_or("Parse error").into(), + }; + + local_responses.push((response.into(), src)); + } + } + + false + } + }; // Update statistics for converted address if src.is_ipv4() { - if res_request.is_ok() { + if request_parsable { requests_received_ipv4 += 1; } bytes_received_ipv4 += bytes_read; } else { - if res_request.is_ok() { + if request_parsable { requests_received_ipv6 += 1; } bytes_received_ipv6 += bytes_read; } - - self.handle_request( - local_responses, - pending_scrape_valid_until, - res_request, - src, - ); } Err(err) if err.kind() == ErrorKind::WouldBlock => { break; @@ -254,13 +279,13 @@ impl SocketWorker { &mut self, local_responses: &mut Vec<(Response, CanonicalSocketAddr)>, pending_scrape_valid_until: ValidUntil, - res_request: Result, + request: Request, src: CanonicalSocketAddr, ) { let access_list_mode = self.config.access_list.mode; - match res_request { - Ok(Request::Connect(request)) => { + match request { + Request::Connect(request) => { let connection_id = self.validator.create_connection_id(src); let response = Response::Connect(ConnectResponse { @@ -270,7 +295,7 @@ impl SocketWorker { local_responses.push((response, src)) } - Ok(Request::Announce(request)) => { + Request::Announce(request) => { if self .validator .connection_id_valid(src, request.connection_id) @@ -298,7 +323,7 @@ impl SocketWorker { } } } - Ok(Request::Scrape(request)) => { + Request::Scrape(request) => { if self .validator .connection_id_valid(src, request.connection_id) @@ -318,25 +343,6 @@ impl SocketWorker { } } } - Err(err) => { - ::log::debug!("Request::from_bytes error: {:?}", err); - - if let RequestParseError::Sendable { - connection_id, - transaction_id, - err, - } = err - { - if self.validator.connection_id_valid(src, connection_id) { - let response = ErrorResponse { - transaction_id, - message: err.right_or("Parse error").into(), - }; - - local_responses.push((response.into(), src)); - } - } - } } }