From b5aa07c21f4660e7dac2dcf321fa9017564d53a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 19 Oct 2021 11:20:59 +0200 Subject: [PATCH] aquatic_udp: glommio networking: use task-per-receiver --- aquatic_udp/src/lib/glommio/handlers.rs | 9 ++++--- aquatic_udp/src/lib/glommio/network.rs | 33 +++++++++++++------------ 2 files changed, 23 insertions(+), 19 deletions(-) diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs index 14e0e16..0fff931 100644 --- a/aquatic_udp/src/lib/glommio/handlers.rs +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -32,8 +32,9 @@ pub async fn run_request_worker( torrents_ipv4.clone(), torrents_ipv6.clone(), response_senders.clone(), - receiver - ).await; + receiver, + ) + .await; } } @@ -43,7 +44,9 @@ async fn handle_request_stream( torrents_ipv6: Rc>>, response_senders: Rc>, mut stream: S, -) where S: Stream + ::std::marker::Unpin { +) where + S: Stream + ::std::marker::Unpin, +{ let mut rng = SmallRng::from_entropy(); // Needs to be updated periodically: use timer? diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs index 834d07a..427ec61 100644 --- a/aquatic_udp/src/lib/glommio/network.rs +++ b/aquatic_udp/src/lib/glommio/network.rs @@ -6,9 +6,9 @@ use std::sync::{ Arc, }; -use futures_lite::StreamExt; -use glommio::channels::channel_mesh::{MeshBuilder, Partial, Receivers, Role, Senders}; -use glommio::channels::local_channel::{new_unbounded, LocalReceiver, LocalSender}; +use futures_lite::{Stream, StreamExt}; +use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; +use glommio::channels::local_channel::{new_unbounded, LocalSender}; use glommio::net::UdpSocket; use glommio::prelude::*; use rand::prelude::{Rng, SeedableRng, StdRng}; @@ -40,7 +40,7 @@ pub async fn run_socket_worker( let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap(); - let (_, response_receivers) = response_mesh_builder.join(Role::Consumer).await.unwrap(); + let (_, mut response_receivers) = response_mesh_builder.join(Role::Consumer).await.unwrap(); let response_consumer_index = response_receivers.consumer_id().unwrap(); @@ -53,7 +53,15 @@ pub async fn run_socket_worker( )) .await; - spawn_local(send_responses(response_receivers, local_receiver, socket)).await; + for (_, receiver) in response_receivers.streams().into_iter() { + spawn_local(send_responses( + socket.clone(), + receiver.map(|(response, addr)| (response.into(), addr)), + )) + .await; + } + + send_responses(socket, local_receiver.stream()).await; } async fn read_requests( @@ -156,20 +164,13 @@ async fn read_requests( } } -async fn send_responses( - mut response_receivers: Receivers<(AnnounceResponse, SocketAddr)>, - local_receiver: LocalReceiver<(Response, SocketAddr)>, - socket: Rc, -) { +async fn send_responses(socket: Rc, mut stream: S) +where + S: Stream + ::std::marker::Unpin, +{ let mut buf = [0u8; MAX_PACKET_SIZE]; let mut buf = Cursor::new(&mut buf[..]); - let mut stream = local_receiver.stream().boxed_local(); - - for (_, receiver) in response_receivers.streams().into_iter() { - stream = Box::pin(stream.or(receiver.map(|(response, addr)| (response.into(), addr)))); - } - while let Some((response, src)) = stream.next().await { buf.set_position(0);