From 2b4cd2dea1a64007fc8d45973cb18df8524357a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 13 Aug 2020 01:50:18 +0200 Subject: [PATCH] aquatic_http: support multiple request workers --- aquatic_http/src/lib/common.rs | 1 + aquatic_http/src/lib/config.rs | 4 ++++ aquatic_http/src/lib/lib.rs | 28 +++++++++++++++++++--------- 3 files changed, 24 insertions(+), 9 deletions(-) diff --git a/aquatic_http/src/lib/common.rs b/aquatic_http/src/lib/common.rs index 258f631..fe14c0a 100644 --- a/aquatic_http/src/lib/common.rs +++ b/aquatic_http/src/lib/common.rs @@ -151,6 +151,7 @@ pub type RequestChannelReceiver = Receiver<(ConnectionMeta, Request)>; pub type ResponseChannelReceiver = Receiver<(ConnectionMeta, Response)>; +#[derive(Clone)] pub struct ResponseChannelSender { senders: Vec>, } diff --git a/aquatic_http/src/lib/config.rs b/aquatic_http/src/lib/config.rs index 2355869..8120f41 100644 --- a/aquatic_http/src/lib/config.rs +++ b/aquatic_http/src/lib/config.rs @@ -12,6 +12,9 @@ pub struct Config { /// them on to the request handler. They then recieve responses from the /// request handler, encode them and send them back over the socket. pub socket_workers: usize, + /// Request workers receive a number of requests from socket workers, + /// generate responses and send them back to the socket workers. + pub request_workers: usize, pub log_level: LogLevel, pub network: NetworkConfig, pub protocol: ProtocolConfig, @@ -102,6 +105,7 @@ impl Default for Config { fn default() -> Self { Self { socket_workers: 1, + request_workers: 1, log_level: LogLevel::default(), network: NetworkConfig::default(), protocol: ProtocolConfig::default(), diff --git a/aquatic_http/src/lib/lib.rs b/aquatic_http/src/lib/lib.rs index e0d3179..203c7ed 100644 --- a/aquatic_http/src/lib/lib.rs +++ b/aquatic_http/src/lib/lib.rs @@ -22,10 +22,21 @@ pub const APP_NAME: &str = "aquatic_http: HTTP/TLS BitTorrent tracker"; pub fn run(config: Config) -> anyhow::Result<()> { - let opt_tls_acceptor = create_tls_acceptor(&config.network.tls)?; - let state = State::default(); + start_workers(config.clone(), state.clone())?; + + loop { + ::std::thread::sleep(Duration::from_secs(config.cleaning.interval)); + + tasks::clean_torrents(&state); + } +} + + +pub fn start_workers(config: Config, state: State) -> anyhow::Result<()> { + let opt_tls_acceptor = create_tls_acceptor(&config.network.tls)?; + let (request_channel_sender, request_channel_receiver) = ::crossbeam_channel::unbounded(); let mut out_message_senders = Vec::new(); @@ -96,11 +107,14 @@ pub fn run(config: Config) -> anyhow::Result<()> { let response_channel_sender = ResponseChannelSender::new(out_message_senders); - { + for i in 0..config.request_workers { let config = config.clone(); let state = state.clone(); + let request_channel_receiver = request_channel_receiver.clone(); + let response_channel_sender = response_channel_sender.clone(); + let wakers = wakers.clone(); - Builder::new().name("request".to_string()).spawn(move || { + Builder::new().name(format!("request-{:02}", i + 1)).spawn(move || { handler::run_request_worker( config, state, @@ -111,10 +125,6 @@ pub fn run(config: Config) -> anyhow::Result<()> { })?; } - loop { - ::std::thread::sleep(Duration::from_secs(config.cleaning.interval)); - - tasks::clean_torrents(&state); - } + Ok(()) }