From 4a8651e1c617bc276a7e20496b1db12115510831 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 19 Oct 2021 01:01:10 +0200 Subject: [PATCH] aquatic_udp: use mesh in glommio version --- aquatic_udp/src/lib/common/announce.rs | 2 +- aquatic_udp/src/lib/common/network.rs | 1 + aquatic_udp/src/lib/glommio/handlers.rs | 54 +++++++++++++------- aquatic_udp/src/lib/glommio/mod.rs | 32 ++++++++++++ aquatic_udp/src/lib/glommio/network.rs | 52 ++++++++++--------- aquatic_udp/src/lib/mio/handlers/announce.rs | 2 +- 6 files changed, 100 insertions(+), 43 deletions(-) diff --git a/aquatic_udp/src/lib/common/announce.rs b/aquatic_udp/src/lib/common/announce.rs index 4a71787..2a63b61 100644 --- a/aquatic_udp/src/lib/common/announce.rs +++ b/aquatic_udp/src/lib/common/announce.rs @@ -175,4 +175,4 @@ mod tests { quickcheck(prop as fn((u16, u16)) -> TestResult); } -} \ No newline at end of file +} diff --git a/aquatic_udp/src/lib/common/network.rs b/aquatic_udp/src/lib/common/network.rs index e69de29..8b13789 100644 --- a/aquatic_udp/src/lib/common/network.rs +++ b/aquatic_udp/src/lib/common/network.rs @@ -0,0 +1 @@ + diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs index 2dc9763..ad28f75 100644 --- a/aquatic_udp/src/lib/glommio/handlers.rs +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -1,23 +1,26 @@ use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; +use futures_lite::stream::empty; +use futures_lite::StreamExt; +use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role}; use glommio::prelude::*; -use glommio::channels::shared_channel::{SharedReceiver, SharedSender}; -use rand::SeedableRng; use rand::prelude::SmallRng; +use rand::SeedableRng; -use crate::config::Config; -use crate::common::*; use crate::common::announce::handle_announce_request; +use crate::common::*; +use crate::config::Config; pub fn run_request_worker( config: Config, - request_receiver: SharedReceiver<(AnnounceRequest, SocketAddr)>, - response_sender: SharedSender<(AnnounceResponse, SocketAddr)>, + request_mesh_builder: MeshBuilder<(usize, AnnounceRequest, SocketAddr), Partial>, + response_mesh_builder: MeshBuilder<(AnnounceResponse, SocketAddr), Partial>, ) { LocalExecutorBuilder::default() .spawn(|| async move { - let request_receiver = request_receiver.connect().await; - let response_sender = response_sender.connect().await; + let (_, mut request_receivers) = + request_mesh_builder.join(Role::Consumer).await.unwrap(); + let (response_senders, _) = response_mesh_builder.join(Role::Producer).await.unwrap(); let mut rng = SmallRng::from_entropy(); @@ -28,23 +31,38 @@ pub fn run_request_worker( // Needs to be updated periodically: use timer? let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); - while let Some((request, addr)) = request_receiver.recv().await { + let mut stream = empty().boxed_local(); + + for (_, receiver) in request_receivers.streams() { + stream = Box::pin(stream.race(receiver)); + } + + while let Some((producer_index, request, addr)) = stream.next().await { let response = match addr.ip() { - IpAddr::V4(ip) => { - handle_announce_request(&config, &mut rng, &mut torrents_ipv4, request, ip, peer_valid_until) - }, - IpAddr::V6(ip) => { - handle_announce_request(&config, &mut rng, &mut torrents_ipv6, request, ip, peer_valid_until) - }, + IpAddr::V4(ip) => handle_announce_request( + &config, + &mut rng, + &mut torrents_ipv4, + request, + ip, + peer_valid_until, + ), + IpAddr::V6(ip) => handle_announce_request( + &config, + &mut rng, + &mut torrents_ipv6, + request, + ip, + peer_valid_until, + ), }; - if let Err(err) = response_sender.try_send((response, addr)) { + if let Err(err) = response_senders.try_send_to(producer_index, (response, addr)) { ::log::warn!("response_sender.try_send: {:?}", err); } } - }) .expect("failed to spawn local executor") .join() .unwrap(); -} \ No newline at end of file +} diff --git a/aquatic_udp/src/lib/glommio/mod.rs b/aquatic_udp/src/lib/glommio/mod.rs index 030b2ee..9381d7f 100644 --- a/aquatic_udp/src/lib/glommio/mod.rs +++ b/aquatic_udp/src/lib/glommio/mod.rs @@ -1,2 +1,34 @@ +use std::sync::{atomic::AtomicUsize, Arc}; + +use glommio::channels::channel_mesh::MeshBuilder; + +use crate::config::Config; + pub mod handlers; pub mod network; + +fn start_workers(config: Config) { + let num_peers = config.socket_workers + config.request_workers; + + let request_mesh_builder = MeshBuilder::partial(num_peers, 1024); + let response_mesh_builder = MeshBuilder::partial(num_peers, 1024); + + let num_bound_sockets = Arc::new(AtomicUsize::new(0)); + + for _ in 0..(config.socket_workers) { + network::run_socket_worker( + config.clone(), + request_mesh_builder.clone(), + response_mesh_builder.clone(), + num_bound_sockets.clone(), + ); + } + + for _ in 0..(config.request_workers) { + handlers::run_request_worker( + config.clone(), + request_mesh_builder.clone(), + response_mesh_builder.clone(), + ); + } +} diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs index 1feb540..a4c3688 100644 --- a/aquatic_udp/src/lib/glommio/network.rs +++ b/aquatic_udp/src/lib/glommio/network.rs @@ -1,7 +1,5 @@ /// TODO -/// - forward announce requests to request workers sharded by info hash (with -/// some nice algo to make it difficult for an attacker to know which one -/// they get forwarded to) +/// - Don't use race, use other means to receive from multiple channels use std::io::Cursor; use std::net::{IpAddr, SocketAddr}; use std::rc::Rc; @@ -11,8 +9,8 @@ use std::sync::{ }; use futures_lite::StreamExt; +use glommio::channels::channel_mesh::{MeshBuilder, Partial, Receivers, Role, Senders}; use glommio::channels::local_channel::{new_unbounded, LocalReceiver, LocalSender}; -use glommio::channels::shared_channel::{SharedReceiver, SharedSender}; use glommio::net::UdpSocket; use glommio::prelude::*; use rand::prelude::{Rng, SeedableRng, StdRng}; @@ -23,10 +21,9 @@ use crate::common::*; use crate::config::Config; pub fn run_socket_worker( - state: State, config: Config, - request_sender: SharedSender<(AnnounceRequest, SocketAddr)>, - response_receiver: SharedReceiver<(AnnounceResponse, SocketAddr)>, + request_mesh_builder: MeshBuilder<(usize, AnnounceRequest, SocketAddr), Partial>, + response_mesh_builder: MeshBuilder<(AnnounceResponse, SocketAddr), Partial>, num_bound_sockets: Arc, ) { LocalExecutorBuilder::default() @@ -45,15 +42,21 @@ pub fn run_socket_worker( num_bound_sockets.fetch_add(1, Ordering::SeqCst); + let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap(); + + let (_, response_receivers) = response_mesh_builder.join(Role::Consumer).await.unwrap(); + + let response_consumer_index = response_receivers.consumer_id().unwrap(); + spawn_local(read_requests( config.clone(), - state.access_list.clone(), - request_sender, + request_senders, + response_consumer_index, local_sender, socket.clone(), )) .await; - spawn_local(send_responses(response_receiver, local_receiver, socket)).await; + spawn_local(send_responses(response_receivers, local_receiver, socket)).await; }) .expect("failed to spawn local executor") .join() @@ -62,18 +65,17 @@ pub fn run_socket_worker( async fn read_requests( config: Config, - access_list: Arc, - request_sender: SharedSender<(AnnounceRequest, SocketAddr)>, + request_senders: Senders<(usize, AnnounceRequest, SocketAddr)>, + response_consumer_index: usize, local_sender: LocalSender<(Response, SocketAddr)>, socket: Rc, ) { - let request_sender = request_sender.connect().await; - let mut rng = StdRng::from_entropy(); let valid_until = ValidUntil::new(config.cleaning.max_connection_age); let access_list_mode = config.access_list.mode; + let access_list = AccessList::default(); let mut connections = ConnectionMap::default(); let mut buf = [0u8; 2048]; @@ -99,9 +101,13 @@ async fn read_requests( Ok(Request::Announce(request)) => { if connections.contains(request.connection_id, src) { if access_list.allows(access_list_mode, &request.info_hash.0) { - if let Err(err) = request_sender - .try_send((request, src)) - { + let request_consumer_index = + (request.info_hash.0[0] as usize) % config.request_workers; + + if let Err(err) = request_senders.try_send_to( + request_consumer_index, + (response_consumer_index, request, src), + ) { ::log::warn!("request_sender.try_send failed: {:?}", err) } } else { @@ -155,18 +161,18 @@ async fn read_requests( } async fn send_responses( - response_receiver: SharedReceiver<(AnnounceResponse, SocketAddr)>, + mut response_receivers: Receivers<(AnnounceResponse, SocketAddr)>, local_receiver: LocalReceiver<(Response, SocketAddr)>, socket: Rc, ) { - let response_receiver = response_receiver.connect().await; - let mut buf = [0u8; MAX_PACKET_SIZE]; let mut buf = Cursor::new(&mut buf[..]); - let mut stream = local_receiver - .stream() - .race(response_receiver.map(|(response, addr)| (response.into(), addr))); + let mut stream = local_receiver.stream().boxed_local(); + + for (_, receiver) in response_receivers.streams().into_iter() { + stream = Box::pin(stream.race(receiver.map(|(response, addr)| (response.into(), addr)))); + } while let Some((response, src)) = stream.next().await { buf.set_position(0); diff --git a/aquatic_udp/src/lib/mio/handlers/announce.rs b/aquatic_udp/src/lib/mio/handlers/announce.rs index 43406f5..99e6a46 100644 --- a/aquatic_udp/src/lib/mio/handlers/announce.rs +++ b/aquatic_udp/src/lib/mio/handlers/announce.rs @@ -7,8 +7,8 @@ use rand::rngs::SmallRng; use aquatic_common::convert_ipv4_mapped_ipv6; use aquatic_udp_protocol::*; -use crate::common::*; use crate::common::announce::handle_announce_request; +use crate::common::*; use crate::config::Config; #[inline]