aquatic_http: glommio: actually start socket workers

This commit is contained in:
Joakim Frostegård 2021-10-26 19:09:17 +02:00
parent 03b8f3e5c5
commit 02735ba2ff
2 changed files with 57 additions and 1 deletions

View file

@ -1,12 +1,65 @@
use glommio::prelude::*;
use std::sync::{Arc, atomic::AtomicUsize};
use aquatic_common::access_list::AccessList;
use glommio::{channels::channel_mesh::MeshBuilder, prelude::*};
use crate::config::Config;
mod network;
const SHARED_CHANNEL_SIZE: usize = 1024;
pub fn run(
config: Config,
) -> anyhow::Result<()> {
let access_list = if config.access_list.mode.is_on() {
AccessList::create_from_path(&config.access_list.path).expect("Load access list")
} else {
AccessList::default()
};
let num_peers = config.socket_workers + config.request_workers;
let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE);
let response_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE);
let num_bound_sockets = Arc::new(AtomicUsize::new(0));
let mut executors = Vec::new();
for i in 0..(config.socket_workers) {
let config = config.clone();
let request_mesh_builder = request_mesh_builder.clone();
let response_mesh_builder = response_mesh_builder.clone();
let num_bound_sockets = num_bound_sockets.clone();
let access_list = access_list.clone();
let mut builder = LocalExecutorBuilder::default();
// if config.core_affinity.set_affinities {
// builder = builder.pin_to_cpu(config.core_affinity.offset + 1 + i);
// }
let executor = builder.spawn(|| async move {
network::run_socket_worker(
config,
request_mesh_builder,
response_mesh_builder,
num_bound_sockets,
// access_list,
)
.await
});
executors.push(executor);
}
for executor in executors {
executor
.expect("failed to spawn local executor")
.join()
.unwrap();
}
Ok(())
}

View file

@ -2,6 +2,7 @@ use std::cell::RefCell;
use std::io::{BufReader, Cursor, Read};
use std::rc::Rc;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use aquatic_http_protocol::request::{Request, RequestParseError};
use aquatic_http_protocol::response::Response;
@ -38,11 +39,13 @@ pub async fn run_socket_worker(
config: Config,
request_mesh_builder: MeshBuilder<(ConnectionId, Request), Partial>,
response_mesh_builder: MeshBuilder<(ConnectionId, Response), Partial>,
num_bound_sockets: Arc<AtomicUsize>,
) {
let tls_config = Arc::new(create_tls_config(&config));
let config = Rc::new(config);
let listener = TcpListener::bind(config.network.address).expect("bind socket");
num_bound_sockets.fetch_add(1, Ordering::SeqCst);
let (_, mut response_receivers) = response_mesh_builder.join(Role::Consumer).await.unwrap();