From 8f0dabc70624ae30eb1fb7835042d0b7fc66e664 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 27 Oct 2021 00:38:53 +0200 Subject: [PATCH] aquatic_http: work on glommio request handlers --- aquatic_http/src/lib/common/mod.rs | 12 +- aquatic_http/src/lib/glommio/common.rs | 56 +++++++++ aquatic_http/src/lib/glommio/handlers.rs | 151 +++++++++++++++++++++++ aquatic_http/src/lib/glommio/mod.rs | 29 +++++ aquatic_http/src/lib/glommio/network.rs | 91 ++++++++++---- aquatic_http/src/lib/mio/network/mod.rs | 12 +- 6 files changed, 311 insertions(+), 40 deletions(-) create mode 100644 aquatic_http/src/lib/glommio/common.rs diff --git a/aquatic_http/src/lib/common/mod.rs b/aquatic_http/src/lib/common/mod.rs index 845db1c..9ff51b1 100644 --- a/aquatic_http/src/lib/common/mod.rs +++ b/aquatic_http/src/lib/common/mod.rs @@ -1,12 +1,10 @@ use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; -use std::sync::Arc; use std::time::Instant; use aquatic_common::access_list::{AccessList}; use either::Either; use hashbrown::HashMap; use indexmap::IndexMap; -use mio::Token; use smartstring::{LazyCompact, SmartString}; pub use aquatic_common::{convert_ipv4_mapped_ipv6, ValidUntil}; @@ -27,15 +25,15 @@ impl Ip for Ipv6Addr {} pub struct ConnectionMeta { /// Index of socket worker responsible for this connection. Required for /// sending back response through correct channel to correct worker. - pub worker_index: usize, + pub worker_index: usize, // Or response consumer id in glommio pub peer_addr: SocketAddr, - pub poll_token: Token, + pub poll_token: usize, // Or connection id in glommio } #[derive(Clone, Copy, Debug)] pub struct PeerConnectionMeta { pub worker_index: usize, - pub poll_token: Token, + pub poll_token: usize, pub peer_ip_address: I, } @@ -113,14 +111,14 @@ pub struct TorrentMaps { } impl TorrentMaps { - pub fn clean(&mut self, config: &Config, access_list: &Arc) { + pub fn clean(&mut self, config: &Config, access_list: &AccessList) { Self::clean_torrent_map(config, access_list, &mut self.ipv4); Self::clean_torrent_map(config, access_list, &mut self.ipv6); } fn clean_torrent_map( config: &Config, - access_list: &Arc, + access_list: &AccessList, torrent_map: &mut TorrentMap, ) { let now = Instant::now(); diff --git a/aquatic_http/src/lib/glommio/common.rs b/aquatic_http/src/lib/glommio/common.rs new file mode 100644 index 0000000..1752deb --- /dev/null +++ b/aquatic_http/src/lib/glommio/common.rs @@ -0,0 +1,56 @@ +use std::net::SocketAddr; + +use aquatic_http_protocol::{request::{AnnounceRequest, ScrapeRequest}, response::{AnnounceResponse, ScrapeResponse}}; + +#[derive(Copy, Clone, Debug)] +pub struct ConsumerId(pub usize); + +#[derive(Clone, Copy, Debug)] +pub struct ConnectionId(pub usize); + +#[derive(Debug)] +pub enum ChannelRequest { + Announce { + request: AnnounceRequest, + peer_addr: SocketAddr, + connection_id: ConnectionId, + response_consumer_id: ConsumerId, + }, + Scrape { + request: ScrapeRequest, + peer_addr: SocketAddr, + original_indices: Vec, + connection_id: ConnectionId, + response_consumer_id: ConsumerId, + } +} + +#[derive(Debug)] +pub enum ChannelResponse { + Announce { + response: AnnounceResponse, + peer_addr: SocketAddr, + connection_id: ConnectionId, + }, + Scrape { + response: ScrapeResponse, + peer_addr: SocketAddr, + original_indices: Vec, + connection_id: ConnectionId, + } +} + +impl ChannelResponse { + pub fn get_connection_id(&self) -> ConnectionId { + match self { + Self::Announce { connection_id, .. } => *connection_id, + Self::Scrape { connection_id, .. } => *connection_id, + } + } + pub fn get_peer_addr(&self) -> SocketAddr { + match self { + Self::Announce { peer_addr, .. } => *peer_addr, + Self::Scrape { peer_addr, .. } => *peer_addr, + } + } +} \ No newline at end of file diff --git a/aquatic_http/src/lib/glommio/handlers.rs b/aquatic_http/src/lib/glommio/handlers.rs index e69de29..6457318 100644 --- a/aquatic_http/src/lib/glommio/handlers.rs +++ b/aquatic_http/src/lib/glommio/handlers.rs @@ -0,0 +1,151 @@ +use std::cell::RefCell; +use std::rc::Rc; +use std::time::Duration; + +use aquatic_common::access_list::AccessList; +use futures_lite::{Stream, StreamExt}; +use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; +use glommio::timer::TimerActionRepeat; +use glommio::{enclose, prelude::*}; +use rand::prelude::SmallRng; +use rand::SeedableRng; + +use crate::common::handlers::handle_announce_request; +use crate::common::handlers::*; +use crate::common::*; +use crate::config::Config; + +use super::common::*; + +pub async fn run_request_worker( + config: Config, + request_mesh_builder: MeshBuilder, + response_mesh_builder: MeshBuilder, + access_list: AccessList, +) { + let (_, mut request_receivers) = request_mesh_builder.join(Role::Consumer).await.unwrap(); + let (response_senders, _) = response_mesh_builder.join(Role::Producer).await.unwrap(); + + let response_senders = Rc::new(response_senders); + + let torrents = Rc::new(RefCell::new(TorrentMaps::default())); + let access_list = Rc::new(RefCell::new(access_list)); + + // Periodically clean torrents and update access list + TimerActionRepeat::repeat(enclose!((config, torrents, access_list) move || { + enclose!((config, torrents, access_list) move || async move { + // update_access_list(config.clone(), access_list.clone()).await; + + torrents.borrow_mut().clean(&config, &*access_list.borrow()); + + Some(Duration::from_secs(config.cleaning.interval)) + })() + })); + + let mut handles = Vec::new(); + + for (_, receiver) in request_receivers.streams() { + let handle = spawn_local(handle_request_stream( + config.clone(), + torrents.clone(), + response_senders.clone(), + receiver, + )) + .detach(); + + handles.push(handle); + } + + for handle in handles { + handle.await; + } +} + +async fn handle_request_stream( + config: Config, + torrents: Rc>, + response_senders: Rc>, + mut stream: S, +) where + S: Stream + ::std::marker::Unpin, +{ + let mut rng = SmallRng::from_entropy(); + + let max_peer_age = config.cleaning.max_peer_age; + let peer_valid_until = Rc::new(RefCell::new(ValidUntil::new(max_peer_age))); + + TimerActionRepeat::repeat(enclose!((peer_valid_until) move || { + enclose!((peer_valid_until) move || async move { + *peer_valid_until.borrow_mut() = ValidUntil::new(max_peer_age); + + Some(Duration::from_secs(1)) + })() + })); + + while let Some(channel_request) = stream.next().await { + let (response, consumer_id) = match channel_request { + ChannelRequest::Announce { + request, + peer_addr, + response_consumer_id, + connection_id + } => { + let meta = ConnectionMeta { + worker_index: response_consumer_id.0, + poll_token: connection_id.0, + peer_addr, + }; + + let response = handle_announce_request( + &config, + &mut rng, + &mut torrents.borrow_mut(), + peer_valid_until.borrow().to_owned(), + meta, + request, + ); + + let response = ChannelResponse::Announce { + response, + peer_addr, + connection_id, + }; + + (response, response_consumer_id) + } + ChannelRequest::Scrape { + request, + peer_addr, + response_consumer_id, + connection_id, + original_indices, + } => { + let meta = ConnectionMeta { + worker_index: response_consumer_id.0, + poll_token: connection_id.0, + peer_addr, + }; + + let response = handle_scrape_request(&config, &mut torrents.borrow_mut(), meta, request); + + let response = ChannelResponse::Scrape { + response, + peer_addr, + connection_id, + original_indices, + }; + + (response, response_consumer_id) + } + }; + + ::log::debug!("preparing to send response to channel: {:?}", response); + + if let Err(err) = response_senders.try_send_to(consumer_id.0, response) { + ::log::warn!("response_sender.try_send: {:?}", err); + } + + yield_if_needed().await; + } +} + diff --git a/aquatic_http/src/lib/glommio/mod.rs b/aquatic_http/src/lib/glommio/mod.rs index a88a8d4..531a300 100644 --- a/aquatic_http/src/lib/glommio/mod.rs +++ b/aquatic_http/src/lib/glommio/mod.rs @@ -5,6 +5,7 @@ use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; use crate::config::Config; +mod common; mod handlers; mod network; @@ -59,6 +60,34 @@ pub fn run( executors.push(executor); } + for i in 0..(config.request_workers) { + let config = config.clone(); + let request_mesh_builder = request_mesh_builder.clone(); + let response_mesh_builder = response_mesh_builder.clone(); + let access_list = access_list.clone(); + + let mut builder = LocalExecutorBuilder::default(); + + // if config.core_affinity.set_affinities { + // builder = + // builder.pin_to_cpu(config.core_affinity.offset + 1 + config.socket_workers + i); + // } + + let executor = builder.spawn(|| async move { + handlers::run_request_worker( + config, + request_mesh_builder, + response_mesh_builder, + access_list, + ) + .await + }); + + executors.push(executor); + } + + // drop_privileges_after_socket_binding(&config, num_bound_sockets).unwrap(); + for executor in executors { executor .expect("failed to spawn local executor") diff --git a/aquatic_http/src/lib/glommio/network.rs b/aquatic_http/src/lib/glommio/network.rs index 6b21c84..29f1e75 100644 --- a/aquatic_http/src/lib/glommio/network.rs +++ b/aquatic_http/src/lib/glommio/network.rs @@ -4,7 +4,8 @@ use std::rc::Rc; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; -use aquatic_http_protocol::request::{Request, RequestParseError}; +use aquatic_http_protocol::common::InfoHash; +use aquatic_http_protocol::request::{AnnounceRequest, Request, RequestParseError}; use aquatic_http_protocol::response::{FailureResponse, Response}; use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt}; use glommio::channels::channel_mesh::{MeshBuilder, Partial, Receivers, Role, Senders}; @@ -19,23 +20,24 @@ use slab::Slab; use crate::common::num_digits_in_usize; use crate::config::Config; +use super::common::*; + const BUFFER_SIZE: usize = 1024; -#[derive(Clone, Copy, Debug)] -pub struct ConnectionId(pub usize); struct ConnectionReference { - response_sender: LocalSender, + response_sender: LocalSender, handle: JoinHandle<()>, } struct Connection { config: Rc, - // request_senders: Rc>, - response_receiver: LocalReceiver, + request_senders: Rc>, + response_receiver: LocalReceiver, + response_consumer_id: ConsumerId, tls: ServerConnection, stream: TcpStream, - index: ConnectionId, + connection_id: ConnectionId, request_buffer: Vec, close_after_writing: bool, } @@ -43,8 +45,8 @@ struct Connection { pub async fn run_socket_worker( config: Config, tls_config: Arc, - request_mesh_builder: MeshBuilder<(ConnectionId, Request), Partial>, - response_mesh_builder: MeshBuilder<(ConnectionId, Response), Partial>, + request_mesh_builder: MeshBuilder, + response_mesh_builder: MeshBuilder, num_bound_sockets: Arc, ) { let config = Rc::new(config); @@ -52,16 +54,18 @@ pub async fn run_socket_worker( let listener = TcpListener::bind(config.network.address).expect("bind socket"); num_bound_sockets.fetch_add(1, Ordering::SeqCst); - // let (_, mut response_receivers) = response_mesh_builder.join(Role::Consumer).await.unwrap(); + let (_, mut response_receivers) = response_mesh_builder.join(Role::Consumer).await.unwrap(); - // let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap(); - // let request_senders = Rc::new(request_senders); + let response_consumer_id = ConsumerId(response_receivers.consumer_id().unwrap()); + + let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap(); + let request_senders = Rc::new(request_senders); let connection_slab = Rc::new(RefCell::new(Slab::new())); - // for (_, response_receiver) in response_receivers.streams() { - // spawn_local(receive_responses(response_receiver, connection_slab.clone())).detach(); - // } + for (_, response_receiver) in response_receivers.streams() { + spawn_local(receive_responses(response_receiver, connection_slab.clone())).detach(); + } let mut incoming = listener.incoming(); @@ -75,11 +79,12 @@ pub async fn run_socket_worker( let conn = Connection { config: config.clone(), - // request_senders: request_senders.clone(), + request_senders: request_senders.clone(), response_receiver, + response_consumer_id, tls: ServerConnection::new(tls_config.clone()).unwrap(), stream, - index: ConnectionId(entry.key()), + connection_id: ConnectionId(entry.key()), request_buffer: Vec::new(), close_after_writing: false, }; @@ -108,12 +113,12 @@ pub async fn run_socket_worker( } async fn receive_responses( - mut response_receiver: ConnectedReceiver<(ConnectionId, Response)>, + mut response_receiver: ConnectedReceiver, connection_references: Rc>>, ) { - while let Some((connection_id, response)) = response_receiver.next().await { - if let Some(reference) = connection_references.borrow().get(connection_id.0) { - reference.response_sender.try_send(response); + while let Some(channel_response) = response_receiver.next().await { + if let Some(reference) = connection_references.borrow().get(channel_response.get_connection_id().0) { + reference.response_sender.try_send(channel_response); } } } @@ -128,16 +133,44 @@ impl Connection { .peer_addr() .map_err(|err| anyhow::anyhow!("Couldn't get peer addr: {:?}", err))?; - // TODO: send request to channel + match request { + Request::Announce(request@AnnounceRequest { info_hash, .. }) => { + let request = ChannelRequest::Announce { + request, + connection_id: self.connection_id, + response_consumer_id: self.response_consumer_id, + peer_addr, + }; + + let consumer_index = calculate_request_consumer_index(&self.config, info_hash); + self.request_senders.try_send_to(consumer_index, request); + }, + Request::Scrape(request) => { + // TODO + }, + } // Wait for response to arrive, then send it - if let Some(response) = self.response_receiver.recv().await { - // TODO: compare IP addresses? + if let Some(channel_response) = self.response_receiver.recv().await { + if channel_response.get_peer_addr() != peer_addr { + return Err(anyhow::anyhow!("peer addressess didn't match")); + } + + let opt_response = match channel_response { + ChannelResponse::Announce { response, .. } => { + Some(Response::Announce(response)) + } + ChannelResponse::Scrape { response, original_indices, .. } => { + None // TODO: accumulate scrape requests + } + }; - self.queue_response(&response)?; + if let Some(response) = opt_response { + self.queue_response(&response)?; - if !self.config.network.keep_alive { - self.close_after_writing = true; + if !self.config.network.keep_alive { + self.close_after_writing = true; + } } } } @@ -274,3 +307,7 @@ impl Connection { Ok(()) } } + +fn calculate_request_consumer_index(config: &Config, info_hash: InfoHash) -> usize { + (info_hash.0[0] as usize) % config.request_workers +} \ No newline at end of file diff --git a/aquatic_http/src/lib/mio/network/mod.rs b/aquatic_http/src/lib/mio/network/mod.rs index 52e8b0d..7887f95 100644 --- a/aquatic_http/src/lib/mio/network/mod.rs +++ b/aquatic_http/src/lib/mio/network/mod.rs @@ -217,7 +217,7 @@ pub fn handle_connection_read_event( { let meta = ConnectionMeta { worker_index: socket_worker_index, - poll_token, + poll_token: poll_token.0, peer_addr: established.peer_addr, }; let response = FailureResponse::new("Info hash not allowed"); @@ -231,7 +231,7 @@ pub fn handle_connection_read_event( Ok(request) => { let meta = ConnectionMeta { worker_index: socket_worker_index, - poll_token, + poll_token: poll_token.0, peer_addr: established.peer_addr, }; @@ -254,7 +254,7 @@ pub fn handle_connection_read_event( let meta = ConnectionMeta { worker_index: socket_worker_index, - poll_token, + poll_token: poll_token.0, peer_addr: established.peer_addr, }; @@ -322,7 +322,7 @@ pub fn send_responses( for (meta, response) in local_responses.chain(channel_responses_drain) { if let Some(established) = connections - .get_mut(&meta.poll_token) + .get_mut(&Token(meta.poll_token)) .and_then(Connection::get_established) { if established.peer_addr != meta.peer_addr { @@ -344,7 +344,7 @@ pub fn send_responses( ); if !config.network.keep_alive { - remove_connection(poll, connections, &meta.poll_token); + remove_connection(poll, connections, &Token(meta.poll_token)); } } Err(err) if err.kind() == ErrorKind::WouldBlock => { @@ -353,7 +353,7 @@ pub fn send_responses( Err(err) => { info!("error sending response: {}", err); - remove_connection(poll, connections, &meta.poll_token); + remove_connection(poll, connections, &Token(meta.poll_token)); } } }