From f0ec858bef73d148df9a17957a22a317f076e6c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 13 Apr 2020 17:09:42 +0200 Subject: [PATCH] socket workers: add Vec for requests (send together) and responses Local response vector is used for error responses --- aquatic/src/lib/lib.rs | 2 -- aquatic/src/lib/network.rs | 31 +++++++++++++++++++++++-------- 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/aquatic/src/lib/lib.rs b/aquatic/src/lib/lib.rs index b51768b..7da6021 100644 --- a/aquatic/src/lib/lib.rs +++ b/aquatic/src/lib/lib.rs @@ -39,7 +39,6 @@ pub fn run(config: Config){ let state = state.clone(); let config = config.clone(); let request_sender = request_sender.clone(); - let response_sender = response_sender.clone(); let response_receiver = response_receiver.clone(); Builder::new().name(format!("socket-worker-{}", i + 1)).spawn(move || @@ -48,7 +47,6 @@ pub fn run(config: Config){ config, i, request_sender, - response_sender, response_receiver, ) ).expect("spawn socket worker"); diff --git a/aquatic/src/lib/network.rs b/aquatic/src/lib/network.rs index b7f1475..decb620 100644 --- a/aquatic/src/lib/network.rs +++ b/aquatic/src/lib/network.rs @@ -2,6 +2,7 @@ use std::sync::atomic::Ordering; use std::io::{Cursor, ErrorKind}; use std::net::SocketAddr; use std::time::Duration; +use std::vec::Drain; use crossbeam_channel::{Sender, Receiver}; use mio::{Events, Poll, Interest, Token}; @@ -21,7 +22,6 @@ pub fn run_socket_worker( config: Config, token_num: usize, request_sender: Sender<(Request, SocketAddr)>, - response_sender: Sender<(Response, SocketAddr)>, response_receiver: Receiver<(Response, SocketAddr)>, ){ let mut buffer = [0u8; MAX_PACKET_SIZE]; @@ -37,6 +37,9 @@ pub fn run_socket_worker( let mut events = Events::with_capacity(config.network.poll_event_capacity); + let mut requests: Vec<(Request, SocketAddr)> = Vec::new(); + let mut local_responses: Vec<(Response, SocketAddr)> = Vec::new(); + let timeout = Duration::from_millis(50); loop { @@ -53,10 +56,16 @@ pub fn run_socket_worker( &config, &mut socket, &mut buffer, - &request_sender, - &response_sender + &mut requests, + &mut local_responses, ); + for r in requests.drain(..){ + if let Err(err) = request_sender.send(r){ + eprintln!("error sending to request_sender: {}", err); + } + } + state.statistics.readable_events.fetch_add(1, Ordering::SeqCst); poll.registry() @@ -72,6 +81,7 @@ pub fn run_socket_worker( &mut socket, &mut buffer, &response_receiver, + local_responses.drain(..) ); } } @@ -117,8 +127,8 @@ fn read_requests( config: &Config, socket: &mut UdpSocket, buffer: &mut [u8], - request_sender: &Sender<(Request, SocketAddr)>, - response_sender: &Sender<(Response, SocketAddr)>, + requests: &mut Vec<(Request, SocketAddr)>, + local_responses: &mut Vec<(Response, SocketAddr)>, ){ let mut requests_received: usize = 0; let mut bytes_received: usize = 0; @@ -139,7 +149,7 @@ fn read_requests( match request { Ok(request) => { - request_sender.try_send((request, src)); + requests.push((request, src)); }, Err(err) => { eprintln!("request_from_bytes error: {:?}", err); @@ -159,7 +169,7 @@ fn read_requests( message, }; - response_sender.try_send((response.into(), src)); + local_responses.push((response.into(), src)); } } }, @@ -191,13 +201,18 @@ fn send_responses( socket: &mut UdpSocket, buffer: &mut [u8], response_receiver: &Receiver<(Response, SocketAddr)>, + local_responses: Drain<(Response, SocketAddr)>, ){ let mut responses_sent: usize = 0; let mut bytes_sent: usize = 0; let mut cursor = Cursor::new(buffer); - for (response, src) in response_receiver.try_iter(){ + let response_iterator = local_responses.into_iter().chain( + response_receiver.try_iter() + ); + + for (response, src) in response_iterator { cursor.set_position(0); response_to_bytes(&mut cursor, response, IpVersion::IPv4).unwrap();