aquatic_udp: in network request reading, send on to channel directly

This commit is contained in:
Joakim Frostegård 2021-10-18 11:06:07 +02:00
parent 6b8616acf9
commit 07a453b6b3

View file

@ -69,7 +69,6 @@ pub fn run_socket_worker(
let mut events = Events::with_capacity(config.network.poll_event_capacity); let mut events = Events::with_capacity(config.network.poll_event_capacity);
let mut connections = ConnectionMap::default(); let mut connections = ConnectionMap::default();
let mut requests: Vec<(ConnectedRequest, SocketAddr)> = Vec::new();
let mut local_responses: Vec<(Response, SocketAddr)> = Vec::new(); let mut local_responses: Vec<(Response, SocketAddr)> = Vec::new();
let timeout = Duration::from_millis(50); let timeout = Duration::from_millis(50);
@ -91,16 +90,10 @@ pub fn run_socket_worker(
&mut rng, &mut rng,
&mut socket, &mut socket,
&mut buffer, &mut buffer,
&mut requests, &request_sender,
&mut local_responses, &mut local_responses,
); );
for r in requests.drain(..) {
if let Err(err) = request_sender.send(r) {
::log::error!("error sending to request_sender: {}", err);
}
}
state state
.statistics .statistics
.readable_events .readable_events
@ -168,7 +161,7 @@ fn read_requests(
rng: &mut StdRng, rng: &mut StdRng,
socket: &mut UdpSocket, socket: &mut UdpSocket,
buffer: &mut [u8], buffer: &mut [u8],
requests: &mut Vec<(ConnectedRequest, SocketAddr)>, request_sender: &Sender<(ConnectedRequest, SocketAddr)>,
local_responses: &mut Vec<(Response, SocketAddr)>, local_responses: &mut Vec<(Response, SocketAddr)>,
) { ) {
let mut requests_received: usize = 0; let mut requests_received: usize = 0;
@ -208,7 +201,11 @@ fn read_requests(
.access_list .access_list
.allows(access_list_mode, &request.info_hash.0) .allows(access_list_mode, &request.info_hash.0)
{ {
requests.push((ConnectedRequest::Announce(request), src)); if let Err(err) =
request_sender.send((ConnectedRequest::Announce(request), src))
{
::log::warn!("request_sender.send failed: {:?}", err)
}
} else { } else {
let response = Response::Error(ErrorResponse { let response = Response::Error(ErrorResponse {
transaction_id: request.transaction_id, transaction_id: request.transaction_id,
@ -221,7 +218,11 @@ fn read_requests(
} }
Ok(Request::Scrape(request)) => { Ok(Request::Scrape(request)) => {
if connections.contains(request.connection_id, src) { if connections.contains(request.connection_id, src) {
requests.push((ConnectedRequest::Scrape(request), src)); if let Err(err) =
request_sender.send((ConnectedRequest::Scrape(request), src))
{
::log::warn!("request_sender.send failed: {:?}", err)
}
} }
} }
Err(err) => { Err(err) => {
@ -309,7 +310,7 @@ fn send_responses(
::log::info!("send_to error: {}", err); ::log::info!("send_to error: {}", err);
} }
} }
}, }
Err(err) => { Err(err) => {
::log::error!("Response::write error: {:?}", err); ::log::error!("Response::write error: {:?}", err);
} }