aquatic_udp: use mesh in glommio version

This commit is contained in:
Joakim Frostegård 2021-10-19 01:01:10 +02:00
parent f28808b30c
commit 4a8651e1c6
6 changed files with 100 additions and 43 deletions

View file

@ -0,0 +1 @@

View file

@ -1,23 +1,26 @@
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use futures_lite::stream::empty;
use futures_lite::StreamExt;
use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role};
use glommio::prelude::*; use glommio::prelude::*;
use glommio::channels::shared_channel::{SharedReceiver, SharedSender};
use rand::SeedableRng;
use rand::prelude::SmallRng; use rand::prelude::SmallRng;
use rand::SeedableRng;
use crate::config::Config;
use crate::common::*;
use crate::common::announce::handle_announce_request; use crate::common::announce::handle_announce_request;
use crate::common::*;
use crate::config::Config;
pub fn run_request_worker( pub fn run_request_worker(
config: Config, config: Config,
request_receiver: SharedReceiver<(AnnounceRequest, SocketAddr)>, request_mesh_builder: MeshBuilder<(usize, AnnounceRequest, SocketAddr), Partial>,
response_sender: SharedSender<(AnnounceResponse, SocketAddr)>, response_mesh_builder: MeshBuilder<(AnnounceResponse, SocketAddr), Partial>,
) { ) {
LocalExecutorBuilder::default() LocalExecutorBuilder::default()
.spawn(|| async move { .spawn(|| async move {
let request_receiver = request_receiver.connect().await; let (_, mut request_receivers) =
let response_sender = response_sender.connect().await; request_mesh_builder.join(Role::Consumer).await.unwrap();
let (response_senders, _) = response_mesh_builder.join(Role::Producer).await.unwrap();
let mut rng = SmallRng::from_entropy(); let mut rng = SmallRng::from_entropy();
@ -28,21 +31,36 @@ pub fn run_request_worker(
// Needs to be updated periodically: use timer? // Needs to be updated periodically: use timer?
let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age);
while let Some((request, addr)) = request_receiver.recv().await { let mut stream = empty().boxed_local();
for (_, receiver) in request_receivers.streams() {
stream = Box::pin(stream.race(receiver));
}
while let Some((producer_index, request, addr)) = stream.next().await {
let response = match addr.ip() { let response = match addr.ip() {
IpAddr::V4(ip) => { IpAddr::V4(ip) => handle_announce_request(
handle_announce_request(&config, &mut rng, &mut torrents_ipv4, request, ip, peer_valid_until) &config,
}, &mut rng,
IpAddr::V6(ip) => { &mut torrents_ipv4,
handle_announce_request(&config, &mut rng, &mut torrents_ipv6, request, ip, peer_valid_until) request,
}, ip,
peer_valid_until,
),
IpAddr::V6(ip) => handle_announce_request(
&config,
&mut rng,
&mut torrents_ipv6,
request,
ip,
peer_valid_until,
),
}; };
if let Err(err) = response_sender.try_send((response, addr)) { if let Err(err) = response_senders.try_send_to(producer_index, (response, addr)) {
::log::warn!("response_sender.try_send: {:?}", err); ::log::warn!("response_sender.try_send: {:?}", err);
} }
} }
}) })
.expect("failed to spawn local executor") .expect("failed to spawn local executor")
.join() .join()

View file

@ -1,2 +1,34 @@
use std::sync::{atomic::AtomicUsize, Arc};
use glommio::channels::channel_mesh::MeshBuilder;
use crate::config::Config;
pub mod handlers; pub mod handlers;
pub mod network; pub mod network;
fn start_workers(config: Config) {
let num_peers = config.socket_workers + config.request_workers;
let request_mesh_builder = MeshBuilder::partial(num_peers, 1024);
let response_mesh_builder = MeshBuilder::partial(num_peers, 1024);
let num_bound_sockets = Arc::new(AtomicUsize::new(0));
for _ in 0..(config.socket_workers) {
network::run_socket_worker(
config.clone(),
request_mesh_builder.clone(),
response_mesh_builder.clone(),
num_bound_sockets.clone(),
);
}
for _ in 0..(config.request_workers) {
handlers::run_request_worker(
config.clone(),
request_mesh_builder.clone(),
response_mesh_builder.clone(),
);
}
}

View file

@ -1,7 +1,5 @@
/// TODO /// TODO
/// - forward announce requests to request workers sharded by info hash (with /// - Don't use race, use other means to receive from multiple channels
/// some nice algo to make it difficult for an attacker to know which one
/// they get forwarded to)
use std::io::Cursor; use std::io::Cursor;
use std::net::{IpAddr, SocketAddr}; use std::net::{IpAddr, SocketAddr};
use std::rc::Rc; use std::rc::Rc;
@ -11,8 +9,8 @@ use std::sync::{
}; };
use futures_lite::StreamExt; use futures_lite::StreamExt;
use glommio::channels::channel_mesh::{MeshBuilder, Partial, Receivers, Role, Senders};
use glommio::channels::local_channel::{new_unbounded, LocalReceiver, LocalSender}; use glommio::channels::local_channel::{new_unbounded, LocalReceiver, LocalSender};
use glommio::channels::shared_channel::{SharedReceiver, SharedSender};
use glommio::net::UdpSocket; use glommio::net::UdpSocket;
use glommio::prelude::*; use glommio::prelude::*;
use rand::prelude::{Rng, SeedableRng, StdRng}; use rand::prelude::{Rng, SeedableRng, StdRng};
@ -23,10 +21,9 @@ use crate::common::*;
use crate::config::Config; use crate::config::Config;
pub fn run_socket_worker( pub fn run_socket_worker(
state: State,
config: Config, config: Config,
request_sender: SharedSender<(AnnounceRequest, SocketAddr)>, request_mesh_builder: MeshBuilder<(usize, AnnounceRequest, SocketAddr), Partial>,
response_receiver: SharedReceiver<(AnnounceResponse, SocketAddr)>, response_mesh_builder: MeshBuilder<(AnnounceResponse, SocketAddr), Partial>,
num_bound_sockets: Arc<AtomicUsize>, num_bound_sockets: Arc<AtomicUsize>,
) { ) {
LocalExecutorBuilder::default() LocalExecutorBuilder::default()
@ -45,15 +42,21 @@ pub fn run_socket_worker(
num_bound_sockets.fetch_add(1, Ordering::SeqCst); num_bound_sockets.fetch_add(1, Ordering::SeqCst);
let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap();
let (_, response_receivers) = response_mesh_builder.join(Role::Consumer).await.unwrap();
let response_consumer_index = response_receivers.consumer_id().unwrap();
spawn_local(read_requests( spawn_local(read_requests(
config.clone(), config.clone(),
state.access_list.clone(), request_senders,
request_sender, response_consumer_index,
local_sender, local_sender,
socket.clone(), socket.clone(),
)) ))
.await; .await;
spawn_local(send_responses(response_receiver, local_receiver, socket)).await; spawn_local(send_responses(response_receivers, local_receiver, socket)).await;
}) })
.expect("failed to spawn local executor") .expect("failed to spawn local executor")
.join() .join()
@ -62,18 +65,17 @@ pub fn run_socket_worker(
async fn read_requests( async fn read_requests(
config: Config, config: Config,
access_list: Arc<AccessList>, request_senders: Senders<(usize, AnnounceRequest, SocketAddr)>,
request_sender: SharedSender<(AnnounceRequest, SocketAddr)>, response_consumer_index: usize,
local_sender: LocalSender<(Response, SocketAddr)>, local_sender: LocalSender<(Response, SocketAddr)>,
socket: Rc<UdpSocket>, socket: Rc<UdpSocket>,
) { ) {
let request_sender = request_sender.connect().await;
let mut rng = StdRng::from_entropy(); let mut rng = StdRng::from_entropy();
let valid_until = ValidUntil::new(config.cleaning.max_connection_age); let valid_until = ValidUntil::new(config.cleaning.max_connection_age);
let access_list_mode = config.access_list.mode; let access_list_mode = config.access_list.mode;
let access_list = AccessList::default();
let mut connections = ConnectionMap::default(); let mut connections = ConnectionMap::default();
let mut buf = [0u8; 2048]; let mut buf = [0u8; 2048];
@ -99,9 +101,13 @@ async fn read_requests(
Ok(Request::Announce(request)) => { Ok(Request::Announce(request)) => {
if connections.contains(request.connection_id, src) { if connections.contains(request.connection_id, src) {
if access_list.allows(access_list_mode, &request.info_hash.0) { if access_list.allows(access_list_mode, &request.info_hash.0) {
if let Err(err) = request_sender let request_consumer_index =
.try_send((request, src)) (request.info_hash.0[0] as usize) % config.request_workers;
{
if let Err(err) = request_senders.try_send_to(
request_consumer_index,
(response_consumer_index, request, src),
) {
::log::warn!("request_sender.try_send failed: {:?}", err) ::log::warn!("request_sender.try_send failed: {:?}", err)
} }
} else { } else {
@ -155,18 +161,18 @@ async fn read_requests(
} }
async fn send_responses( async fn send_responses(
response_receiver: SharedReceiver<(AnnounceResponse, SocketAddr)>, mut response_receivers: Receivers<(AnnounceResponse, SocketAddr)>,
local_receiver: LocalReceiver<(Response, SocketAddr)>, local_receiver: LocalReceiver<(Response, SocketAddr)>,
socket: Rc<UdpSocket>, socket: Rc<UdpSocket>,
) { ) {
let response_receiver = response_receiver.connect().await;
let mut buf = [0u8; MAX_PACKET_SIZE]; let mut buf = [0u8; MAX_PACKET_SIZE];
let mut buf = Cursor::new(&mut buf[..]); let mut buf = Cursor::new(&mut buf[..]);
let mut stream = local_receiver let mut stream = local_receiver.stream().boxed_local();
.stream()
.race(response_receiver.map(|(response, addr)| (response.into(), addr))); for (_, receiver) in response_receivers.streams().into_iter() {
stream = Box::pin(stream.race(receiver.map(|(response, addr)| (response.into(), addr))));
}
while let Some((response, src)) = stream.next().await { while let Some((response, src)) = stream.next().await {
buf.set_position(0); buf.set_position(0);

View file

@ -7,8 +7,8 @@ use rand::rngs::SmallRng;
use aquatic_common::convert_ipv4_mapped_ipv6; use aquatic_common::convert_ipv4_mapped_ipv6;
use aquatic_udp_protocol::*; use aquatic_udp_protocol::*;
use crate::common::*;
use crate::common::announce::handle_announce_request; use crate::common::announce::handle_announce_request;
use crate::common::*;
use crate::config::Config; use crate::config::Config;
#[inline] #[inline]