From cce7bd7150f7c5312e8604a2cc673d2162416f8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 10 Aug 2020 02:48:28 +0200 Subject: [PATCH] aquatic_http: send responses for each event, use mio poll waker This means * less fluctuation in number of responses send per second * longer poll timeouts can be used since poll is woken when responses are available for sending * drain-like method used to fetch responses from response channel, meaning responses added while iterating won't be processed --- aquatic_http/src/lib/common.rs | 15 +++++--- aquatic_http/src/lib/config.rs | 4 +-- aquatic_http/src/lib/handler.rs | 54 +++++++++++++++++++---------- aquatic_http/src/lib/lib.rs | 9 ++++- aquatic_http/src/lib/network/mod.rs | 28 +++++++++------ 5 files changed, 74 insertions(+), 36 deletions(-) diff --git a/aquatic_http/src/lib/common.rs b/aquatic_http/src/lib/common.rs index 107f3d4..258f631 100644 --- a/aquatic_http/src/lib/common.rs +++ b/aquatic_http/src/lib/common.rs @@ -18,6 +18,7 @@ use aquatic_http_protocol::response::{Response, ResponsePeer}; pub const LISTENER_TOKEN: Token = Token(0); +pub const CHANNEL_TOKEN: Token = Token(1); pub trait Ip: Copy + Eq + ::std::hash::Hash {} @@ -150,12 +151,18 @@ pub type RequestChannelReceiver = Receiver<(ConnectionMeta, Request)>; pub type ResponseChannelReceiver = Receiver<(ConnectionMeta, Response)>; -pub struct ResponseChannelSender(Vec>); +pub struct ResponseChannelSender { + senders: Vec>, +} impl ResponseChannelSender { - pub fn new(senders: Vec>) -> Self { - Self(senders) + pub fn new( + senders: Vec>, + ) -> Self { + Self { + senders, + } } #[inline] @@ -164,7 +171,7 @@ impl ResponseChannelSender { meta: ConnectionMeta, message: Response ){ - if let Err(err) = self.0[meta.worker_index].send((meta, message)){ + if let Err(err) = self.senders[meta.worker_index].send((meta, message)){ error!("ResponseChannelSender: couldn't send message: {:?}", err); } } diff --git a/aquatic_http/src/lib/config.rs b/aquatic_http/src/lib/config.rs index a8ccd98..2355869 100644 --- a/aquatic_http/src/lib/config.rs +++ b/aquatic_http/src/lib/config.rs @@ -121,7 +121,7 @@ impl Default for NetworkConfig { tls: TlsConfig::default(), keep_alive: true, poll_event_capacity: 4096, - poll_timeout_microseconds: 10_000, + poll_timeout_microseconds: 200_000, } } } @@ -141,7 +141,7 @@ impl Default for ProtocolConfig { impl Default for HandlerConfig { fn default() -> Self { Self { - max_requests_per_iter: 10000, + max_requests_per_iter: 10_000, channel_recv_timeout_microseconds: 200, } } diff --git a/aquatic_http/src/lib/handler.rs b/aquatic_http/src/lib/handler.rs index c594eca..ec95986 100644 --- a/aquatic_http/src/lib/handler.rs +++ b/aquatic_http/src/lib/handler.rs @@ -2,8 +2,10 @@ use std::collections::BTreeMap; use std::time::Duration; use std::vec::Drain; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; +use std::sync::Arc; use either::Either; +use mio::Waker; use parking_lot::MutexGuard; use rand::{Rng, SeedableRng, rngs::SmallRng}; @@ -20,8 +22,9 @@ pub fn run_request_worker( state: State, request_channel_receiver: RequestChannelReceiver, response_channel_sender: ResponseChannelSender, + wakers: Vec>, ){ - let mut responses = Vec::new(); + let mut wake_socket_workers: Vec = (0..config.socket_workers).map(|_| false).collect(); let mut announce_requests = Vec::new(); let mut scrape_requests = Vec::new(); @@ -35,6 +38,9 @@ pub fn run_request_worker( loop { let mut opt_torrent_map_guard: Option> = None; + // If torrent state mutex is locked, just keep collecting requests + // and process them later. This can happen with either multiple + // request workers or while cleaning is underway. for i in 0..config.handlers.max_requests_per_iter { let opt_in_message = if i == 0 { request_channel_receiver.recv().ok() @@ -66,21 +72,27 @@ pub fn run_request_worker( &config, &mut rng, &mut torrent_map_guard, - &mut responses, + &response_channel_sender, + &mut wake_socket_workers, announce_requests.drain(..) ); handle_scrape_requests( &config, &mut torrent_map_guard, - &mut responses, + &response_channel_sender, + &mut wake_socket_workers, scrape_requests.drain(..) ); - ::std::mem::drop(torrent_map_guard); + for (worker_index, wake) in wake_socket_workers.iter_mut().enumerate(){ + if *wake { + if let Err(err) = wakers[worker_index].wake(){ + ::log::error!("request handler couldn't wake poll: {:?}", err); + } - for (meta, response) in responses.drain(..){ - response_channel_sender.send(meta, response); + *wake = false; + } } } } @@ -90,14 +102,15 @@ pub fn handle_announce_requests( config: &Config, rng: &mut impl Rng, torrent_maps: &mut TorrentMaps, - responses: &mut Vec<(ConnectionMeta, Response)>, + response_channel_sender: &ResponseChannelSender, + wake_socket_workers: &mut Vec, requests: Drain<(ConnectionMeta, AnnounceRequest)>, ){ let valid_until = ValidUntil::new(config.cleaning.max_peer_age); - responses.extend(requests.map(|(request_sender_meta, request)| { + for (meta, request) in requests { let peer_ip = convert_ipv4_mapped_ipv6( - request_sender_meta.peer_addr.ip() + meta.peer_addr.ip() ); ::log::debug!("peer ip: {:?}", peer_ip); @@ -109,8 +122,8 @@ pub fn handle_announce_requests( .or_default(); let peer_connection_meta = PeerConnectionMeta { - worker_index: request_sender_meta.worker_index, - poll_token: request_sender_meta.poll_token, + worker_index: meta.worker_index, + poll_token: meta.poll_token, peer_ip_address, }; @@ -139,8 +152,8 @@ pub fn handle_announce_requests( .or_default(); let peer_connection_meta = PeerConnectionMeta { - worker_index: request_sender_meta.worker_index, - poll_token: request_sender_meta.poll_token, + worker_index: meta.worker_index, + poll_token: meta.poll_token, peer_ip_address }; @@ -165,8 +178,9 @@ pub fn handle_announce_requests( }, }; - (request_sender_meta, response) - })); + response_channel_sender.send(meta, response); + wake_socket_workers[meta.worker_index] = true; + }; } @@ -250,10 +264,11 @@ fn upsert_peer_and_get_response_peers( pub fn handle_scrape_requests( config: &Config, torrent_maps: &mut TorrentMaps, - messages_out: &mut Vec<(ConnectionMeta, Response)>, + response_channel_sender: &ResponseChannelSender, + wake_socket_workers: &mut Vec, requests: Drain<(ConnectionMeta, ScrapeRequest)>, ){ - messages_out.extend(requests.map(|(meta, request)| { + for (meta, request) in requests { let num_to_take = request.info_hashes.len().min( config.protocol.max_scrape_torrents ); @@ -295,6 +310,7 @@ pub fn handle_scrape_requests( }; - (meta, Response::Scrape(response)) - })); + response_channel_sender.send(meta, Response::Scrape(response)); + wake_socket_workers[meta.worker_index] = true; + }; } \ No newline at end of file diff --git a/aquatic_http/src/lib/lib.rs b/aquatic_http/src/lib/lib.rs index 10df8fb..979dc87 100644 --- a/aquatic_http/src/lib/lib.rs +++ b/aquatic_http/src/lib/lib.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use std::thread::Builder; use anyhow::Context; +use mio::{Poll, Waker}; use parking_lot::Mutex; use privdrop::PrivDrop; @@ -25,6 +26,7 @@ pub fn run(config: Config) -> anyhow::Result<()> { let (request_channel_sender, request_channel_receiver) = ::crossbeam_channel::unbounded(); let mut out_message_senders = Vec::new(); + let mut wakers = Vec::new(); let socket_worker_statuses: SocketWorkerStatuses = { let mut statuses = Vec::new(); @@ -41,10 +43,13 @@ pub fn run(config: Config) -> anyhow::Result<()> { let socket_worker_statuses = socket_worker_statuses.clone(); let request_channel_sender = request_channel_sender.clone(); let opt_tls_acceptor = opt_tls_acceptor.clone(); + let poll = Poll::new().expect("create poll"); + let waker = Arc::new(Waker::new(poll.registry(), CHANNEL_TOKEN).expect("create waker")); let (response_channel_sender, response_channel_receiver) = ::crossbeam_channel::unbounded(); out_message_senders.push(response_channel_sender); + wakers.push(waker); Builder::new().name(format!("socket-{:02}", i + 1)).spawn(move || { network::run_socket_worker( @@ -53,7 +58,8 @@ pub fn run(config: Config) -> anyhow::Result<()> { socket_worker_statuses, request_channel_sender, response_channel_receiver, - opt_tls_acceptor + opt_tls_acceptor, + poll ); })?; } @@ -97,6 +103,7 @@ pub fn run(config: Config) -> anyhow::Result<()> { state, request_channel_receiver, response_channel_sender, + wakers, ); })?; } diff --git a/aquatic_http/src/lib/network/mod.rs b/aquatic_http/src/lib/network/mod.rs index 5aa2085..d11cdd1 100644 --- a/aquatic_http/src/lib/network/mod.rs +++ b/aquatic_http/src/lib/network/mod.rs @@ -32,6 +32,7 @@ pub fn run_socket_worker( request_channel_sender: RequestChannelSender, response_channel_receiver: ResponseChannelReceiver, opt_tls_acceptor: Option, + poll: Poll, ){ match create_listener(config.network.address, config.network.ipv6_only){ Ok(listener) => { @@ -43,7 +44,8 @@ pub fn run_socket_worker( request_channel_sender, response_channel_receiver, listener, - opt_tls_acceptor + opt_tls_acceptor, + poll, ); }, Err(err) => { @@ -62,13 +64,13 @@ pub fn run_poll_loop( response_channel_receiver: ResponseChannelReceiver, listener: ::std::net::TcpListener, opt_tls_acceptor: Option, + mut poll: Poll, ){ let poll_timeout = Duration::from_micros( config.network.poll_timeout_microseconds ); let mut listener = TcpListener::from_std(listener); - let mut poll = Poll::new().expect("create poll"); let mut events = Events::with_capacity(config.network.poll_event_capacity); poll.registry() @@ -101,7 +103,7 @@ pub fn run_poll_loop( &mut poll_token_counter, &opt_tls_acceptor, ); - } else { + } else if token != CHANNEL_TOKEN { handle_connection_read_event( &config, socket_worker_index, @@ -112,15 +114,16 @@ pub fn run_poll_loop( token, ); } - } - if !(local_responses.is_empty() & (response_channel_receiver.is_empty())) { + // Send responses for each event. Channel token is not interesting + // by itself, but is just for making sure responses are sent even + // if no new connects / requests come in. send_responses( &config, &mut poll, &mut response_buffer, local_responses.drain(..), - response_channel_receiver.try_iter(), + &response_channel_receiver, &mut connections ); } @@ -150,8 +153,9 @@ fn accept_new_streams( Ok((mut stream, _)) => { poll_token_counter.0 = poll_token_counter.0.wrapping_add(1); - if *poll_token_counter == LISTENER_TOKEN { - poll_token_counter.0 = 1; + // Skip listener and channel tokens + if poll_token_counter.0 < 2 { + poll_token_counter.0 = 2; } let token = *poll_token_counter; @@ -312,10 +316,14 @@ pub fn send_responses( poll: &mut Poll, buffer: &mut Cursor<&mut [u8]>, local_responses: Drain<(ConnectionMeta, Response)>, - channel_responses: crossbeam_channel::TryIter<(ConnectionMeta, Response)>, + channel_responses: &ResponseChannelReceiver, connections: &mut ConnectionMap, ){ - for (meta, response) in local_responses.chain(channel_responses){ + let channel_responses_len = channel_responses.len(); + let channel_responses_drain = channel_responses.try_iter() + .take(channel_responses_len); + + for (meta, response) in local_responses.chain(channel_responses_drain){ if let Some(established) = connections.get_mut(&meta.poll_token) .and_then(Connection::get_established) {