diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs index d7b5220..14e0e16 100644 --- a/aquatic_udp/src/lib/glommio/handlers.rs +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -1,8 +1,9 @@ +use std::cell::RefCell; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; +use std::rc::Rc; -use futures_lite::stream::empty; -use futures_lite::StreamExt; -use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role}; +use futures_lite::{Stream, StreamExt}; +use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; use glommio::prelude::*; use rand::prelude::SmallRng; use rand::SeedableRng; @@ -19,27 +20,41 @@ pub async fn run_request_worker( let (_, mut request_receivers) = request_mesh_builder.join(Role::Consumer).await.unwrap(); let (response_senders, _) = response_mesh_builder.join(Role::Producer).await.unwrap(); - let mut rng = SmallRng::from_entropy(); + let response_senders = Rc::new(response_senders); // Need to be cleaned periodically: use timer? - let mut torrents_ipv4 = TorrentMap::::default(); - let mut torrents_ipv6 = TorrentMap::::default(); + let torrents_ipv4 = Rc::new(RefCell::new(TorrentMap::::default())); + let torrents_ipv6 = Rc::new(RefCell::new(TorrentMap::::default())); + + for (_, receiver) in request_receivers.streams() { + handle_request_stream( + &config, + torrents_ipv4.clone(), + torrents_ipv6.clone(), + response_senders.clone(), + receiver + ).await; + } +} + +async fn handle_request_stream( + config: &Config, + torrents_ipv4: Rc>>, + torrents_ipv6: Rc>>, + response_senders: Rc>, + mut stream: S, +) where S: Stream + ::std::marker::Unpin { + let mut rng = SmallRng::from_entropy(); // Needs to be updated periodically: use timer? let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); - let mut stream = empty().boxed_local(); - - for (_, receiver) in request_receivers.streams() { - stream = Box::pin(stream.or(receiver)); - } - while let Some((producer_index, request, addr)) = stream.next().await { let response = match addr.ip() { IpAddr::V4(ip) => handle_announce_request( &config, &mut rng, - &mut torrents_ipv4, + &mut torrents_ipv4.borrow_mut(), request, ip, peer_valid_until, @@ -47,7 +62,7 @@ pub async fn run_request_worker( IpAddr::V6(ip) => handle_announce_request( &config, &mut rng, - &mut torrents_ipv6, + &mut torrents_ipv6.borrow_mut(), request, ip, peer_valid_until,