diff --git a/aquatic_http/src/lib/glommio/mod.rs b/aquatic_http/src/lib/glommio/mod.rs index 1bf5358..3bb89fb 100644 --- a/aquatic_http/src/lib/glommio/mod.rs +++ b/aquatic_http/src/lib/glommio/mod.rs @@ -1,12 +1,65 @@ -use glommio::prelude::*; +use std::sync::{Arc, atomic::AtomicUsize}; + +use aquatic_common::access_list::AccessList; +use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; use crate::config::Config; mod network; +const SHARED_CHANNEL_SIZE: usize = 1024; + pub fn run( config: Config, ) -> anyhow::Result<()> { + let access_list = if config.access_list.mode.is_on() { + AccessList::create_from_path(&config.access_list.path).expect("Load access list") + } else { + AccessList::default() + }; + + let num_peers = config.socket_workers + config.request_workers; + + let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); + let response_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); + + let num_bound_sockets = Arc::new(AtomicUsize::new(0)); + + let mut executors = Vec::new(); + + for i in 0..(config.socket_workers) { + let config = config.clone(); + let request_mesh_builder = request_mesh_builder.clone(); + let response_mesh_builder = response_mesh_builder.clone(); + let num_bound_sockets = num_bound_sockets.clone(); + let access_list = access_list.clone(); + + let mut builder = LocalExecutorBuilder::default(); + + // if config.core_affinity.set_affinities { + // builder = builder.pin_to_cpu(config.core_affinity.offset + 1 + i); + // } + + let executor = builder.spawn(|| async move { + network::run_socket_worker( + config, + request_mesh_builder, + response_mesh_builder, + num_bound_sockets, + // access_list, + ) + .await + }); + + executors.push(executor); + } + + for executor in executors { + executor + .expect("failed to spawn local executor") + .join() + .unwrap(); + } Ok(()) } \ No newline at end of file diff --git a/aquatic_http/src/lib/glommio/network.rs b/aquatic_http/src/lib/glommio/network.rs index 7f1fcdf..87cb2ec 100644 --- a/aquatic_http/src/lib/glommio/network.rs +++ b/aquatic_http/src/lib/glommio/network.rs @@ -2,6 +2,7 @@ use std::cell::RefCell; use std::io::{BufReader, Cursor, Read}; use std::rc::Rc; use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; use aquatic_http_protocol::request::{Request, RequestParseError}; use aquatic_http_protocol::response::Response; @@ -38,11 +39,13 @@ pub async fn run_socket_worker( config: Config, request_mesh_builder: MeshBuilder<(ConnectionId, Request), Partial>, response_mesh_builder: MeshBuilder<(ConnectionId, Response), Partial>, + num_bound_sockets: Arc, ) { let tls_config = Arc::new(create_tls_config(&config)); let config = Rc::new(config); let listener = TcpListener::bind(config.network.address).expect("bind socket"); + num_bound_sockets.fetch_add(1, Ordering::SeqCst); let (_, mut response_receivers) = response_mesh_builder.join(Role::Consumer).await.unwrap();