mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-04-02 02:35:31 +00:00
udp: handle request parse errors outside of handle_request function
This commit is contained in:
parent
4587c267d6
commit
5e28f5a498
1 changed files with 44 additions and 38 deletions
|
|
@ -100,7 +100,7 @@ impl SocketWorker {
|
||||||
|
|
||||||
for event in events.iter() {
|
for event in events.iter() {
|
||||||
if event.is_readable() {
|
if event.is_readable() {
|
||||||
self.read_requests(&mut local_responses, pending_scrape_valid_until);
|
self.read_and_handle_requests(&mut local_responses, pending_scrape_valid_until);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -175,7 +175,7 @@ impl SocketWorker {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn read_requests(
|
fn read_and_handle_requests(
|
||||||
&mut self,
|
&mut self,
|
||||||
local_responses: &mut Vec<(Response, CanonicalSocketAddr)>,
|
local_responses: &mut Vec<(Response, CanonicalSocketAddr)>,
|
||||||
pending_scrape_valid_until: ValidUntil,
|
pending_scrape_valid_until: ValidUntil,
|
||||||
|
|
@ -194,32 +194,57 @@ impl SocketWorker {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
let res_request = Request::from_bytes(
|
let src = CanonicalSocketAddr::new(src);
|
||||||
|
|
||||||
|
let request_parsable = match Request::from_bytes(
|
||||||
&self.buffer[..bytes_read],
|
&self.buffer[..bytes_read],
|
||||||
self.config.protocol.max_scrape_torrents,
|
self.config.protocol.max_scrape_torrents,
|
||||||
);
|
) {
|
||||||
|
Ok(request) => {
|
||||||
|
self.handle_request(
|
||||||
|
local_responses,
|
||||||
|
pending_scrape_valid_until,
|
||||||
|
request,
|
||||||
|
src,
|
||||||
|
);
|
||||||
|
|
||||||
let src = CanonicalSocketAddr::new(src);
|
true
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
::log::debug!("Request::from_bytes error: {:?}", err);
|
||||||
|
|
||||||
|
if let RequestParseError::Sendable {
|
||||||
|
connection_id,
|
||||||
|
transaction_id,
|
||||||
|
err,
|
||||||
|
} = err
|
||||||
|
{
|
||||||
|
if self.validator.connection_id_valid(src, connection_id) {
|
||||||
|
let response = ErrorResponse {
|
||||||
|
transaction_id,
|
||||||
|
message: err.right_or("Parse error").into(),
|
||||||
|
};
|
||||||
|
|
||||||
|
local_responses.push((response.into(), src));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
false
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
// Update statistics for converted address
|
// Update statistics for converted address
|
||||||
if src.is_ipv4() {
|
if src.is_ipv4() {
|
||||||
if res_request.is_ok() {
|
if request_parsable {
|
||||||
requests_received_ipv4 += 1;
|
requests_received_ipv4 += 1;
|
||||||
}
|
}
|
||||||
bytes_received_ipv4 += bytes_read;
|
bytes_received_ipv4 += bytes_read;
|
||||||
} else {
|
} else {
|
||||||
if res_request.is_ok() {
|
if request_parsable {
|
||||||
requests_received_ipv6 += 1;
|
requests_received_ipv6 += 1;
|
||||||
}
|
}
|
||||||
bytes_received_ipv6 += bytes_read;
|
bytes_received_ipv6 += bytes_read;
|
||||||
}
|
}
|
||||||
|
|
||||||
self.handle_request(
|
|
||||||
local_responses,
|
|
||||||
pending_scrape_valid_until,
|
|
||||||
res_request,
|
|
||||||
src,
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
Err(err) if err.kind() == ErrorKind::WouldBlock => {
|
Err(err) if err.kind() == ErrorKind::WouldBlock => {
|
||||||
break;
|
break;
|
||||||
|
|
@ -254,13 +279,13 @@ impl SocketWorker {
|
||||||
&mut self,
|
&mut self,
|
||||||
local_responses: &mut Vec<(Response, CanonicalSocketAddr)>,
|
local_responses: &mut Vec<(Response, CanonicalSocketAddr)>,
|
||||||
pending_scrape_valid_until: ValidUntil,
|
pending_scrape_valid_until: ValidUntil,
|
||||||
res_request: Result<Request, RequestParseError>,
|
request: Request,
|
||||||
src: CanonicalSocketAddr,
|
src: CanonicalSocketAddr,
|
||||||
) {
|
) {
|
||||||
let access_list_mode = self.config.access_list.mode;
|
let access_list_mode = self.config.access_list.mode;
|
||||||
|
|
||||||
match res_request {
|
match request {
|
||||||
Ok(Request::Connect(request)) => {
|
Request::Connect(request) => {
|
||||||
let connection_id = self.validator.create_connection_id(src);
|
let connection_id = self.validator.create_connection_id(src);
|
||||||
|
|
||||||
let response = Response::Connect(ConnectResponse {
|
let response = Response::Connect(ConnectResponse {
|
||||||
|
|
@ -270,7 +295,7 @@ impl SocketWorker {
|
||||||
|
|
||||||
local_responses.push((response, src))
|
local_responses.push((response, src))
|
||||||
}
|
}
|
||||||
Ok(Request::Announce(request)) => {
|
Request::Announce(request) => {
|
||||||
if self
|
if self
|
||||||
.validator
|
.validator
|
||||||
.connection_id_valid(src, request.connection_id)
|
.connection_id_valid(src, request.connection_id)
|
||||||
|
|
@ -298,7 +323,7 @@ impl SocketWorker {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(Request::Scrape(request)) => {
|
Request::Scrape(request) => {
|
||||||
if self
|
if self
|
||||||
.validator
|
.validator
|
||||||
.connection_id_valid(src, request.connection_id)
|
.connection_id_valid(src, request.connection_id)
|
||||||
|
|
@ -318,25 +343,6 @@ impl SocketWorker {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(err) => {
|
|
||||||
::log::debug!("Request::from_bytes error: {:?}", err);
|
|
||||||
|
|
||||||
if let RequestParseError::Sendable {
|
|
||||||
connection_id,
|
|
||||||
transaction_id,
|
|
||||||
err,
|
|
||||||
} = err
|
|
||||||
{
|
|
||||||
if self.validator.connection_id_valid(src, connection_id) {
|
|
||||||
let response = ErrorResponse {
|
|
||||||
transaction_id,
|
|
||||||
message: err.right_or("Parse error").into(),
|
|
||||||
};
|
|
||||||
|
|
||||||
local_responses.push((response.into(), src));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue