aquatic_udp: glommio networking: use task-per-receiver

This commit is contained in:
Joakim Frostegård 2021-10-19 11:20:59 +02:00
parent 68b2bdd4a6
commit b5aa07c21f
2 changed files with 23 additions and 19 deletions

View file

@ -32,8 +32,9 @@ pub async fn run_request_worker(
torrents_ipv4.clone(), torrents_ipv4.clone(),
torrents_ipv6.clone(), torrents_ipv6.clone(),
response_senders.clone(), response_senders.clone(),
receiver receiver,
).await; )
.await;
} }
} }
@ -43,7 +44,9 @@ async fn handle_request_stream<S>(
torrents_ipv6: Rc<RefCell<TorrentMap<Ipv6Addr>>>, torrents_ipv6: Rc<RefCell<TorrentMap<Ipv6Addr>>>,
response_senders: Rc<Senders<(AnnounceResponse, SocketAddr)>>, response_senders: Rc<Senders<(AnnounceResponse, SocketAddr)>>,
mut stream: S, mut stream: S,
) where S: Stream<Item = (usize, AnnounceRequest, SocketAddr)> + ::std::marker::Unpin { ) where
S: Stream<Item = (usize, AnnounceRequest, SocketAddr)> + ::std::marker::Unpin,
{
let mut rng = SmallRng::from_entropy(); let mut rng = SmallRng::from_entropy();
// Needs to be updated periodically: use timer? // Needs to be updated periodically: use timer?

View file

@ -6,9 +6,9 @@ use std::sync::{
Arc, Arc,
}; };
use futures_lite::StreamExt; use futures_lite::{Stream, StreamExt};
use glommio::channels::channel_mesh::{MeshBuilder, Partial, Receivers, Role, Senders}; use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders};
use glommio::channels::local_channel::{new_unbounded, LocalReceiver, LocalSender}; use glommio::channels::local_channel::{new_unbounded, LocalSender};
use glommio::net::UdpSocket; use glommio::net::UdpSocket;
use glommio::prelude::*; use glommio::prelude::*;
use rand::prelude::{Rng, SeedableRng, StdRng}; use rand::prelude::{Rng, SeedableRng, StdRng};
@ -40,7 +40,7 @@ pub async fn run_socket_worker(
let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap(); let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap();
let (_, response_receivers) = response_mesh_builder.join(Role::Consumer).await.unwrap(); let (_, mut response_receivers) = response_mesh_builder.join(Role::Consumer).await.unwrap();
let response_consumer_index = response_receivers.consumer_id().unwrap(); let response_consumer_index = response_receivers.consumer_id().unwrap();
@ -53,7 +53,15 @@ pub async fn run_socket_worker(
)) ))
.await; .await;
spawn_local(send_responses(response_receivers, local_receiver, socket)).await; for (_, receiver) in response_receivers.streams().into_iter() {
spawn_local(send_responses(
socket.clone(),
receiver.map(|(response, addr)| (response.into(), addr)),
))
.await;
}
send_responses(socket, local_receiver.stream()).await;
} }
async fn read_requests( async fn read_requests(
@ -156,20 +164,13 @@ async fn read_requests(
} }
} }
async fn send_responses( async fn send_responses<S>(socket: Rc<UdpSocket>, mut stream: S)
mut response_receivers: Receivers<(AnnounceResponse, SocketAddr)>, where
local_receiver: LocalReceiver<(Response, SocketAddr)>, S: Stream<Item = (Response, SocketAddr)> + ::std::marker::Unpin,
socket: Rc<UdpSocket>, {
) {
let mut buf = [0u8; MAX_PACKET_SIZE]; let mut buf = [0u8; MAX_PACKET_SIZE];
let mut buf = Cursor::new(&mut buf[..]); let mut buf = Cursor::new(&mut buf[..]);
let mut stream = local_receiver.stream().boxed_local();
for (_, receiver) in response_receivers.streams().into_iter() {
stream = Box::pin(stream.or(receiver.map(|(response, addr)| (response.into(), addr))));
}
while let Some((response, src)) = stream.next().await { while let Some((response, src)) = stream.next().await {
buf.set_position(0); buf.set_position(0);