aquatic_ws: support multiple request workers

This commit is contained in:
Joakim Frostegård 2020-08-13 01:50:01 +02:00
parent cf0623e302
commit 875eb122c7
3 changed files with 24 additions and 9 deletions

View file

@ -118,6 +118,7 @@ pub type InMessageReceiver = Receiver<(ConnectionMeta, InMessage)>;
pub type OutMessageReceiver = Receiver<(ConnectionMeta, OutMessage)>;
#[derive(Clone)]
pub struct OutMessageSender(Vec<Sender<(ConnectionMeta, OutMessage)>>);

View file

@ -12,6 +12,9 @@ pub struct Config {
/// them on to the request handler. They then recieve responses from the
/// request handler, encode them and send them back over the socket.
pub socket_workers: usize,
/// Request workers receive a number of requests from socket workers,
/// generate responses and send them back to the socket workers.
pub request_workers: usize,
pub log_level: LogLevel,
pub network: NetworkConfig,
pub protocol: ProtocolConfig,
@ -94,6 +97,7 @@ impl Default for Config {
fn default() -> Self {
Self {
socket_workers: 1,
request_workers: 1,
log_level: LogLevel::default(),
network: NetworkConfig::default(),
protocol: ProtocolConfig::default(),

View file

@ -24,10 +24,21 @@ pub const APP_NAME: &str = "aquatic_ws: WebTorrent tracker";
pub fn run(config: Config) -> anyhow::Result<()> {
let opt_tls_acceptor = create_tls_acceptor(&config)?;
let state = State::default();
start_workers(config.clone(), state.clone())?;
loop {
::std::thread::sleep(Duration::from_secs(config.cleaning.interval));
tasks::clean_torrents(&state);
}
}
pub fn start_workers(config: Config, state: State) -> anyhow::Result<()> {
let opt_tls_acceptor = create_tls_acceptor(&config)?;
let (in_message_sender, in_message_receiver) = ::crossbeam_channel::unbounded();
let mut out_message_senders = Vec::new();
@ -98,11 +109,14 @@ pub fn run(config: Config) -> anyhow::Result<()> {
let out_message_sender = OutMessageSender::new(out_message_senders);
{
for i in 0..config.request_workers {
let config = config.clone();
let state = state.clone();
let in_message_receiver = in_message_receiver.clone();
let out_message_sender = out_message_sender.clone();
let wakers = wakers.clone();
Builder::new().name("request".to_string()).spawn(move || {
Builder::new().name(format!("request-{:02}", i + 1)).spawn(move || {
handler::run_request_worker(
config,
state,
@ -113,11 +127,7 @@ pub fn run(config: Config) -> anyhow::Result<()> {
})?;
}
loop {
::std::thread::sleep(Duration::from_secs(config.cleaning.interval));
tasks::clean_torrents(&state);
}
Ok(())
}