diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs index ad28f75..c9b0316 100644 --- a/aquatic_udp/src/lib/glommio/handlers.rs +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -3,7 +3,6 @@ use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use futures_lite::stream::empty; use futures_lite::StreamExt; use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role}; -use glommio::prelude::*; use rand::prelude::SmallRng; use rand::SeedableRng; @@ -11,58 +10,51 @@ use crate::common::announce::handle_announce_request; use crate::common::*; use crate::config::Config; -pub fn run_request_worker( +pub async fn run_request_worker( config: Config, request_mesh_builder: MeshBuilder<(usize, AnnounceRequest, SocketAddr), Partial>, response_mesh_builder: MeshBuilder<(AnnounceResponse, SocketAddr), Partial>, ) { - LocalExecutorBuilder::default() - .spawn(|| async move { - let (_, mut request_receivers) = - request_mesh_builder.join(Role::Consumer).await.unwrap(); - let (response_senders, _) = response_mesh_builder.join(Role::Producer).await.unwrap(); + let (_, mut request_receivers) = request_mesh_builder.join(Role::Consumer).await.unwrap(); + let (response_senders, _) = response_mesh_builder.join(Role::Producer).await.unwrap(); - let mut rng = SmallRng::from_entropy(); + let mut rng = SmallRng::from_entropy(); - // Need to be cleaned periodically: use timer? - let mut torrents_ipv4 = TorrentMap::::default(); - let mut torrents_ipv6 = TorrentMap::::default(); + // Need to be cleaned periodically: use timer? + let mut torrents_ipv4 = TorrentMap::::default(); + let mut torrents_ipv6 = TorrentMap::::default(); - // Needs to be updated periodically: use timer? - let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); + // Needs to be updated periodically: use timer? + let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); - let mut stream = empty().boxed_local(); + let mut stream = empty().boxed_local(); - for (_, receiver) in request_receivers.streams() { - stream = Box::pin(stream.race(receiver)); - } + for (_, receiver) in request_receivers.streams() { + stream = Box::pin(stream.race(receiver)); + } - while let Some((producer_index, request, addr)) = stream.next().await { - let response = match addr.ip() { - IpAddr::V4(ip) => handle_announce_request( - &config, - &mut rng, - &mut torrents_ipv4, - request, - ip, - peer_valid_until, - ), - IpAddr::V6(ip) => handle_announce_request( - &config, - &mut rng, - &mut torrents_ipv6, - request, - ip, - peer_valid_until, - ), - }; + while let Some((producer_index, request, addr)) = stream.next().await { + let response = match addr.ip() { + IpAddr::V4(ip) => handle_announce_request( + &config, + &mut rng, + &mut torrents_ipv4, + request, + ip, + peer_valid_until, + ), + IpAddr::V6(ip) => handle_announce_request( + &config, + &mut rng, + &mut torrents_ipv6, + request, + ip, + peer_valid_until, + ), + }; - if let Err(err) = response_senders.try_send_to(producer_index, (response, addr)) { - ::log::warn!("response_sender.try_send: {:?}", err); - } - } - }) - .expect("failed to spawn local executor") - .join() - .unwrap(); + if let Err(err) = response_senders.try_send_to(producer_index, (response, addr)) { + ::log::warn!("response_sender.try_send: {:?}", err); + } + } } diff --git a/aquatic_udp/src/lib/glommio/mod.rs b/aquatic_udp/src/lib/glommio/mod.rs index b33e784..89c6b9b 100644 --- a/aquatic_udp/src/lib/glommio/mod.rs +++ b/aquatic_udp/src/lib/glommio/mod.rs @@ -1,6 +1,7 @@ use std::sync::{atomic::AtomicUsize, Arc}; use glommio::channels::channel_mesh::MeshBuilder; +use glommio::prelude::*; use crate::config::Config; @@ -15,21 +16,44 @@ pub fn run(config: Config) -> anyhow::Result<()> { let num_bound_sockets = Arc::new(AtomicUsize::new(0)); + let mut executors = Vec::new(); + for _ in 0..(config.socket_workers) { - network::run_socket_worker( - config.clone(), - request_mesh_builder.clone(), - response_mesh_builder.clone(), - num_bound_sockets.clone(), - ); + let config = config.clone(); + let request_mesh_builder = request_mesh_builder.clone(); + let response_mesh_builder = response_mesh_builder.clone(); + let num_bound_sockets = num_bound_sockets.clone(); + + let executor = LocalExecutorBuilder::default().spawn(|| async move { + network::run_socket_worker( + config, + request_mesh_builder, + response_mesh_builder, + num_bound_sockets, + ) + .await + }); + + executors.push(executor); } for _ in 0..(config.request_workers) { - handlers::run_request_worker( - config.clone(), - request_mesh_builder.clone(), - response_mesh_builder.clone(), - ); + let config = config.clone(); + let request_mesh_builder = request_mesh_builder.clone(); + let response_mesh_builder = response_mesh_builder.clone(); + + let executor = LocalExecutorBuilder::default().spawn(|| async move { + handlers::run_request_worker(config, request_mesh_builder, response_mesh_builder).await + }); + + executors.push(executor); + } + + for executor in executors { + executor + .expect("failed to spawn local executor") + .join() + .unwrap(); } Ok(()) diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs index a4c3688..f685e8f 100644 --- a/aquatic_udp/src/lib/glommio/network.rs +++ b/aquatic_udp/src/lib/glommio/network.rs @@ -1,5 +1,3 @@ -/// TODO -/// - Don't use race, use other means to receive from multiple channels use std::io::Cursor; use std::net::{IpAddr, SocketAddr}; use std::rc::Rc; @@ -20,47 +18,42 @@ use aquatic_udp_protocol::{IpVersion, Request, Response}; use crate::common::*; use crate::config::Config; -pub fn run_socket_worker( +pub async fn run_socket_worker( config: Config, request_mesh_builder: MeshBuilder<(usize, AnnounceRequest, SocketAddr), Partial>, response_mesh_builder: MeshBuilder<(AnnounceResponse, SocketAddr), Partial>, num_bound_sockets: Arc, ) { - LocalExecutorBuilder::default() - .spawn(|| async move { - let (local_sender, local_receiver) = new_unbounded(); + let (local_sender, local_receiver) = new_unbounded(); - let mut socket = UdpSocket::bind(config.network.address).unwrap(); + let mut socket = UdpSocket::bind(config.network.address).unwrap(); - let recv_buffer_size = config.network.socket_recv_buffer_size; + let recv_buffer_size = config.network.socket_recv_buffer_size; - if recv_buffer_size != 0 { - socket.set_buffer_size(recv_buffer_size); - } + if recv_buffer_size != 0 { + socket.set_buffer_size(recv_buffer_size); + } - let socket = Rc::new(socket); + let socket = Rc::new(socket); - num_bound_sockets.fetch_add(1, Ordering::SeqCst); + num_bound_sockets.fetch_add(1, Ordering::SeqCst); - let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap(); + let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap(); - let (_, response_receivers) = response_mesh_builder.join(Role::Consumer).await.unwrap(); + let (_, response_receivers) = response_mesh_builder.join(Role::Consumer).await.unwrap(); - let response_consumer_index = response_receivers.consumer_id().unwrap(); + let response_consumer_index = response_receivers.consumer_id().unwrap(); - spawn_local(read_requests( - config.clone(), - request_senders, - response_consumer_index, - local_sender, - socket.clone(), - )) - .await; - spawn_local(send_responses(response_receivers, local_receiver, socket)).await; - }) - .expect("failed to spawn local executor") - .join() - .unwrap(); + spawn_local(read_requests( + config.clone(), + request_senders, + response_consumer_index, + local_sender, + socket.clone(), + )) + .await; + + spawn_local(send_responses(response_receivers, local_receiver, socket)).await; } async fn read_requests(