aquatic_udp: glommio: start executors in mod.rs, update TODO

This commit is contained in:
Joakim Frostegård 2021-10-19 01:53:47 +02:00
parent 3aebdfda8a
commit c6ba1bc61c
3 changed files with 93 additions and 84 deletions

View file

@ -1,5 +1,3 @@
/// TODO
/// - Don't use race, use other means to receive from multiple channels
use std::io::Cursor;
use std::net::{IpAddr, SocketAddr};
use std::rc::Rc;
@ -20,47 +18,42 @@ use aquatic_udp_protocol::{IpVersion, Request, Response};
use crate::common::*;
use crate::config::Config;
pub fn run_socket_worker(
pub async fn run_socket_worker(
config: Config,
request_mesh_builder: MeshBuilder<(usize, AnnounceRequest, SocketAddr), Partial>,
response_mesh_builder: MeshBuilder<(AnnounceResponse, SocketAddr), Partial>,
num_bound_sockets: Arc<AtomicUsize>,
) {
LocalExecutorBuilder::default()
.spawn(|| async move {
let (local_sender, local_receiver) = new_unbounded();
let (local_sender, local_receiver) = new_unbounded();
let mut socket = UdpSocket::bind(config.network.address).unwrap();
let mut socket = UdpSocket::bind(config.network.address).unwrap();
let recv_buffer_size = config.network.socket_recv_buffer_size;
let recv_buffer_size = config.network.socket_recv_buffer_size;
if recv_buffer_size != 0 {
socket.set_buffer_size(recv_buffer_size);
}
if recv_buffer_size != 0 {
socket.set_buffer_size(recv_buffer_size);
}
let socket = Rc::new(socket);
let socket = Rc::new(socket);
num_bound_sockets.fetch_add(1, Ordering::SeqCst);
num_bound_sockets.fetch_add(1, Ordering::SeqCst);
let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap();
let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap();
let (_, response_receivers) = response_mesh_builder.join(Role::Consumer).await.unwrap();
let (_, response_receivers) = response_mesh_builder.join(Role::Consumer).await.unwrap();
let response_consumer_index = response_receivers.consumer_id().unwrap();
let response_consumer_index = response_receivers.consumer_id().unwrap();
spawn_local(read_requests(
config.clone(),
request_senders,
response_consumer_index,
local_sender,
socket.clone(),
))
.await;
spawn_local(send_responses(response_receivers, local_receiver, socket)).await;
})
.expect("failed to spawn local executor")
.join()
.unwrap();
spawn_local(read_requests(
config.clone(),
request_senders,
response_consumer_index,
local_sender,
socket.clone(),
))
.await;
spawn_local(send_responses(response_receivers, local_receiver, socket)).await;
}
async fn read_requests(