mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-03-31 17:55:36 +00:00
aquatic_http: support multiple request workers
This commit is contained in:
parent
875eb122c7
commit
2b4cd2dea1
3 changed files with 24 additions and 9 deletions
|
|
@ -151,6 +151,7 @@ pub type RequestChannelReceiver = Receiver<(ConnectionMeta, Request)>;
|
|||
pub type ResponseChannelReceiver = Receiver<(ConnectionMeta, Response)>;
|
||||
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ResponseChannelSender {
|
||||
senders: Vec<Sender<(ConnectionMeta, Response)>>,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,6 +12,9 @@ pub struct Config {
|
|||
/// them on to the request handler. They then recieve responses from the
|
||||
/// request handler, encode them and send them back over the socket.
|
||||
pub socket_workers: usize,
|
||||
/// Request workers receive a number of requests from socket workers,
|
||||
/// generate responses and send them back to the socket workers.
|
||||
pub request_workers: usize,
|
||||
pub log_level: LogLevel,
|
||||
pub network: NetworkConfig,
|
||||
pub protocol: ProtocolConfig,
|
||||
|
|
@ -102,6 +105,7 @@ impl Default for Config {
|
|||
fn default() -> Self {
|
||||
Self {
|
||||
socket_workers: 1,
|
||||
request_workers: 1,
|
||||
log_level: LogLevel::default(),
|
||||
network: NetworkConfig::default(),
|
||||
protocol: ProtocolConfig::default(),
|
||||
|
|
|
|||
|
|
@ -22,10 +22,21 @@ pub const APP_NAME: &str = "aquatic_http: HTTP/TLS BitTorrent tracker";
|
|||
|
||||
|
||||
pub fn run(config: Config) -> anyhow::Result<()> {
|
||||
let opt_tls_acceptor = create_tls_acceptor(&config.network.tls)?;
|
||||
|
||||
let state = State::default();
|
||||
|
||||
start_workers(config.clone(), state.clone())?;
|
||||
|
||||
loop {
|
||||
::std::thread::sleep(Duration::from_secs(config.cleaning.interval));
|
||||
|
||||
tasks::clean_torrents(&state);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
pub fn start_workers(config: Config, state: State) -> anyhow::Result<()> {
|
||||
let opt_tls_acceptor = create_tls_acceptor(&config.network.tls)?;
|
||||
|
||||
let (request_channel_sender, request_channel_receiver) = ::crossbeam_channel::unbounded();
|
||||
|
||||
let mut out_message_senders = Vec::new();
|
||||
|
|
@ -96,11 +107,14 @@ pub fn run(config: Config) -> anyhow::Result<()> {
|
|||
|
||||
let response_channel_sender = ResponseChannelSender::new(out_message_senders);
|
||||
|
||||
{
|
||||
for i in 0..config.request_workers {
|
||||
let config = config.clone();
|
||||
let state = state.clone();
|
||||
let request_channel_receiver = request_channel_receiver.clone();
|
||||
let response_channel_sender = response_channel_sender.clone();
|
||||
let wakers = wakers.clone();
|
||||
|
||||
Builder::new().name("request".to_string()).spawn(move || {
|
||||
Builder::new().name(format!("request-{:02}", i + 1)).spawn(move || {
|
||||
handler::run_request_worker(
|
||||
config,
|
||||
state,
|
||||
|
|
@ -111,10 +125,6 @@ pub fn run(config: Config) -> anyhow::Result<()> {
|
|||
})?;
|
||||
}
|
||||
|
||||
loop {
|
||||
::std::thread::sleep(Duration::from_secs(config.cleaning.interval));
|
||||
|
||||
tasks::clean_torrents(&state);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue