aquatic_http: send responses for each event, use mio poll waker

This means
* less fluctuation in number of responses send per second
* longer poll timeouts can be used since poll is woken when
  responses are available for sending
* drain-like method used to fetch responses from response
  channel, meaning responses added while iterating won't
  be processed
This commit is contained in:
Joakim Frostegård 2020-08-10 02:48:28 +02:00
parent 1d2dfe3fe1
commit cce7bd7150
5 changed files with 74 additions and 36 deletions

View file

@ -18,6 +18,7 @@ use aquatic_http_protocol::response::{Response, ResponsePeer};
pub const LISTENER_TOKEN: Token = Token(0); pub const LISTENER_TOKEN: Token = Token(0);
pub const CHANNEL_TOKEN: Token = Token(1);
pub trait Ip: Copy + Eq + ::std::hash::Hash {} pub trait Ip: Copy + Eq + ::std::hash::Hash {}
@ -150,12 +151,18 @@ pub type RequestChannelReceiver = Receiver<(ConnectionMeta, Request)>;
pub type ResponseChannelReceiver = Receiver<(ConnectionMeta, Response)>; pub type ResponseChannelReceiver = Receiver<(ConnectionMeta, Response)>;
pub struct ResponseChannelSender(Vec<Sender<(ConnectionMeta, Response)>>); pub struct ResponseChannelSender {
senders: Vec<Sender<(ConnectionMeta, Response)>>,
}
impl ResponseChannelSender { impl ResponseChannelSender {
pub fn new(senders: Vec<Sender<(ConnectionMeta, Response)>>) -> Self { pub fn new(
Self(senders) senders: Vec<Sender<(ConnectionMeta, Response)>>,
) -> Self {
Self {
senders,
}
} }
#[inline] #[inline]
@ -164,7 +171,7 @@ impl ResponseChannelSender {
meta: ConnectionMeta, meta: ConnectionMeta,
message: Response message: Response
){ ){
if let Err(err) = self.0[meta.worker_index].send((meta, message)){ if let Err(err) = self.senders[meta.worker_index].send((meta, message)){
error!("ResponseChannelSender: couldn't send message: {:?}", err); error!("ResponseChannelSender: couldn't send message: {:?}", err);
} }
} }

View file

@ -121,7 +121,7 @@ impl Default for NetworkConfig {
tls: TlsConfig::default(), tls: TlsConfig::default(),
keep_alive: true, keep_alive: true,
poll_event_capacity: 4096, poll_event_capacity: 4096,
poll_timeout_microseconds: 10_000, poll_timeout_microseconds: 200_000,
} }
} }
} }
@ -141,7 +141,7 @@ impl Default for ProtocolConfig {
impl Default for HandlerConfig { impl Default for HandlerConfig {
fn default() -> Self { fn default() -> Self {
Self { Self {
max_requests_per_iter: 10000, max_requests_per_iter: 10_000,
channel_recv_timeout_microseconds: 200, channel_recv_timeout_microseconds: 200,
} }
} }

View file

@ -2,8 +2,10 @@ use std::collections::BTreeMap;
use std::time::Duration; use std::time::Duration;
use std::vec::Drain; use std::vec::Drain;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::sync::Arc;
use either::Either; use either::Either;
use mio::Waker;
use parking_lot::MutexGuard; use parking_lot::MutexGuard;
use rand::{Rng, SeedableRng, rngs::SmallRng}; use rand::{Rng, SeedableRng, rngs::SmallRng};
@ -20,8 +22,9 @@ pub fn run_request_worker(
state: State, state: State,
request_channel_receiver: RequestChannelReceiver, request_channel_receiver: RequestChannelReceiver,
response_channel_sender: ResponseChannelSender, response_channel_sender: ResponseChannelSender,
wakers: Vec<Arc<Waker>>,
){ ){
let mut responses = Vec::new(); let mut wake_socket_workers: Vec<bool> = (0..config.socket_workers).map(|_| false).collect();
let mut announce_requests = Vec::new(); let mut announce_requests = Vec::new();
let mut scrape_requests = Vec::new(); let mut scrape_requests = Vec::new();
@ -35,6 +38,9 @@ pub fn run_request_worker(
loop { loop {
let mut opt_torrent_map_guard: Option<MutexGuard<TorrentMaps>> = None; let mut opt_torrent_map_guard: Option<MutexGuard<TorrentMaps>> = None;
// If torrent state mutex is locked, just keep collecting requests
// and process them later. This can happen with either multiple
// request workers or while cleaning is underway.
for i in 0..config.handlers.max_requests_per_iter { for i in 0..config.handlers.max_requests_per_iter {
let opt_in_message = if i == 0 { let opt_in_message = if i == 0 {
request_channel_receiver.recv().ok() request_channel_receiver.recv().ok()
@ -66,21 +72,27 @@ pub fn run_request_worker(
&config, &config,
&mut rng, &mut rng,
&mut torrent_map_guard, &mut torrent_map_guard,
&mut responses, &response_channel_sender,
&mut wake_socket_workers,
announce_requests.drain(..) announce_requests.drain(..)
); );
handle_scrape_requests( handle_scrape_requests(
&config, &config,
&mut torrent_map_guard, &mut torrent_map_guard,
&mut responses, &response_channel_sender,
&mut wake_socket_workers,
scrape_requests.drain(..) scrape_requests.drain(..)
); );
::std::mem::drop(torrent_map_guard); for (worker_index, wake) in wake_socket_workers.iter_mut().enumerate(){
if *wake {
if let Err(err) = wakers[worker_index].wake(){
::log::error!("request handler couldn't wake poll: {:?}", err);
}
for (meta, response) in responses.drain(..){ *wake = false;
response_channel_sender.send(meta, response); }
} }
} }
} }
@ -90,14 +102,15 @@ pub fn handle_announce_requests(
config: &Config, config: &Config,
rng: &mut impl Rng, rng: &mut impl Rng,
torrent_maps: &mut TorrentMaps, torrent_maps: &mut TorrentMaps,
responses: &mut Vec<(ConnectionMeta, Response)>, response_channel_sender: &ResponseChannelSender,
wake_socket_workers: &mut Vec<bool>,
requests: Drain<(ConnectionMeta, AnnounceRequest)>, requests: Drain<(ConnectionMeta, AnnounceRequest)>,
){ ){
let valid_until = ValidUntil::new(config.cleaning.max_peer_age); let valid_until = ValidUntil::new(config.cleaning.max_peer_age);
responses.extend(requests.map(|(request_sender_meta, request)| { for (meta, request) in requests {
let peer_ip = convert_ipv4_mapped_ipv6( let peer_ip = convert_ipv4_mapped_ipv6(
request_sender_meta.peer_addr.ip() meta.peer_addr.ip()
); );
::log::debug!("peer ip: {:?}", peer_ip); ::log::debug!("peer ip: {:?}", peer_ip);
@ -109,8 +122,8 @@ pub fn handle_announce_requests(
.or_default(); .or_default();
let peer_connection_meta = PeerConnectionMeta { let peer_connection_meta = PeerConnectionMeta {
worker_index: request_sender_meta.worker_index, worker_index: meta.worker_index,
poll_token: request_sender_meta.poll_token, poll_token: meta.poll_token,
peer_ip_address, peer_ip_address,
}; };
@ -139,8 +152,8 @@ pub fn handle_announce_requests(
.or_default(); .or_default();
let peer_connection_meta = PeerConnectionMeta { let peer_connection_meta = PeerConnectionMeta {
worker_index: request_sender_meta.worker_index, worker_index: meta.worker_index,
poll_token: request_sender_meta.poll_token, poll_token: meta.poll_token,
peer_ip_address peer_ip_address
}; };
@ -165,8 +178,9 @@ pub fn handle_announce_requests(
}, },
}; };
(request_sender_meta, response) response_channel_sender.send(meta, response);
})); wake_socket_workers[meta.worker_index] = true;
};
} }
@ -250,10 +264,11 @@ fn upsert_peer_and_get_response_peers<I: Ip>(
pub fn handle_scrape_requests( pub fn handle_scrape_requests(
config: &Config, config: &Config,
torrent_maps: &mut TorrentMaps, torrent_maps: &mut TorrentMaps,
messages_out: &mut Vec<(ConnectionMeta, Response)>, response_channel_sender: &ResponseChannelSender,
wake_socket_workers: &mut Vec<bool>,
requests: Drain<(ConnectionMeta, ScrapeRequest)>, requests: Drain<(ConnectionMeta, ScrapeRequest)>,
){ ){
messages_out.extend(requests.map(|(meta, request)| { for (meta, request) in requests {
let num_to_take = request.info_hashes.len().min( let num_to_take = request.info_hashes.len().min(
config.protocol.max_scrape_torrents config.protocol.max_scrape_torrents
); );
@ -295,6 +310,7 @@ pub fn handle_scrape_requests(
}; };
(meta, Response::Scrape(response)) response_channel_sender.send(meta, Response::Scrape(response));
})); wake_socket_workers[meta.worker_index] = true;
};
} }

View file

@ -3,6 +3,7 @@ use std::sync::Arc;
use std::thread::Builder; use std::thread::Builder;
use anyhow::Context; use anyhow::Context;
use mio::{Poll, Waker};
use parking_lot::Mutex; use parking_lot::Mutex;
use privdrop::PrivDrop; use privdrop::PrivDrop;
@ -25,6 +26,7 @@ pub fn run(config: Config) -> anyhow::Result<()> {
let (request_channel_sender, request_channel_receiver) = ::crossbeam_channel::unbounded(); let (request_channel_sender, request_channel_receiver) = ::crossbeam_channel::unbounded();
let mut out_message_senders = Vec::new(); let mut out_message_senders = Vec::new();
let mut wakers = Vec::new();
let socket_worker_statuses: SocketWorkerStatuses = { let socket_worker_statuses: SocketWorkerStatuses = {
let mut statuses = Vec::new(); let mut statuses = Vec::new();
@ -41,10 +43,13 @@ pub fn run(config: Config) -> anyhow::Result<()> {
let socket_worker_statuses = socket_worker_statuses.clone(); let socket_worker_statuses = socket_worker_statuses.clone();
let request_channel_sender = request_channel_sender.clone(); let request_channel_sender = request_channel_sender.clone();
let opt_tls_acceptor = opt_tls_acceptor.clone(); let opt_tls_acceptor = opt_tls_acceptor.clone();
let poll = Poll::new().expect("create poll");
let waker = Arc::new(Waker::new(poll.registry(), CHANNEL_TOKEN).expect("create waker"));
let (response_channel_sender, response_channel_receiver) = ::crossbeam_channel::unbounded(); let (response_channel_sender, response_channel_receiver) = ::crossbeam_channel::unbounded();
out_message_senders.push(response_channel_sender); out_message_senders.push(response_channel_sender);
wakers.push(waker);
Builder::new().name(format!("socket-{:02}", i + 1)).spawn(move || { Builder::new().name(format!("socket-{:02}", i + 1)).spawn(move || {
network::run_socket_worker( network::run_socket_worker(
@ -53,7 +58,8 @@ pub fn run(config: Config) -> anyhow::Result<()> {
socket_worker_statuses, socket_worker_statuses,
request_channel_sender, request_channel_sender,
response_channel_receiver, response_channel_receiver,
opt_tls_acceptor opt_tls_acceptor,
poll
); );
})?; })?;
} }
@ -97,6 +103,7 @@ pub fn run(config: Config) -> anyhow::Result<()> {
state, state,
request_channel_receiver, request_channel_receiver,
response_channel_sender, response_channel_sender,
wakers,
); );
})?; })?;
} }

View file

@ -32,6 +32,7 @@ pub fn run_socket_worker(
request_channel_sender: RequestChannelSender, request_channel_sender: RequestChannelSender,
response_channel_receiver: ResponseChannelReceiver, response_channel_receiver: ResponseChannelReceiver,
opt_tls_acceptor: Option<TlsAcceptor>, opt_tls_acceptor: Option<TlsAcceptor>,
poll: Poll,
){ ){
match create_listener(config.network.address, config.network.ipv6_only){ match create_listener(config.network.address, config.network.ipv6_only){
Ok(listener) => { Ok(listener) => {
@ -43,7 +44,8 @@ pub fn run_socket_worker(
request_channel_sender, request_channel_sender,
response_channel_receiver, response_channel_receiver,
listener, listener,
opt_tls_acceptor opt_tls_acceptor,
poll,
); );
}, },
Err(err) => { Err(err) => {
@ -62,13 +64,13 @@ pub fn run_poll_loop(
response_channel_receiver: ResponseChannelReceiver, response_channel_receiver: ResponseChannelReceiver,
listener: ::std::net::TcpListener, listener: ::std::net::TcpListener,
opt_tls_acceptor: Option<TlsAcceptor>, opt_tls_acceptor: Option<TlsAcceptor>,
mut poll: Poll,
){ ){
let poll_timeout = Duration::from_micros( let poll_timeout = Duration::from_micros(
config.network.poll_timeout_microseconds config.network.poll_timeout_microseconds
); );
let mut listener = TcpListener::from_std(listener); let mut listener = TcpListener::from_std(listener);
let mut poll = Poll::new().expect("create poll");
let mut events = Events::with_capacity(config.network.poll_event_capacity); let mut events = Events::with_capacity(config.network.poll_event_capacity);
poll.registry() poll.registry()
@ -101,7 +103,7 @@ pub fn run_poll_loop(
&mut poll_token_counter, &mut poll_token_counter,
&opt_tls_acceptor, &opt_tls_acceptor,
); );
} else { } else if token != CHANNEL_TOKEN {
handle_connection_read_event( handle_connection_read_event(
&config, &config,
socket_worker_index, socket_worker_index,
@ -112,15 +114,16 @@ pub fn run_poll_loop(
token, token,
); );
} }
}
if !(local_responses.is_empty() & (response_channel_receiver.is_empty())) { // Send responses for each event. Channel token is not interesting
// by itself, but is just for making sure responses are sent even
// if no new connects / requests come in.
send_responses( send_responses(
&config, &config,
&mut poll, &mut poll,
&mut response_buffer, &mut response_buffer,
local_responses.drain(..), local_responses.drain(..),
response_channel_receiver.try_iter(), &response_channel_receiver,
&mut connections &mut connections
); );
} }
@ -150,8 +153,9 @@ fn accept_new_streams(
Ok((mut stream, _)) => { Ok((mut stream, _)) => {
poll_token_counter.0 = poll_token_counter.0.wrapping_add(1); poll_token_counter.0 = poll_token_counter.0.wrapping_add(1);
if *poll_token_counter == LISTENER_TOKEN { // Skip listener and channel tokens
poll_token_counter.0 = 1; if poll_token_counter.0 < 2 {
poll_token_counter.0 = 2;
} }
let token = *poll_token_counter; let token = *poll_token_counter;
@ -312,10 +316,14 @@ pub fn send_responses(
poll: &mut Poll, poll: &mut Poll,
buffer: &mut Cursor<&mut [u8]>, buffer: &mut Cursor<&mut [u8]>,
local_responses: Drain<(ConnectionMeta, Response)>, local_responses: Drain<(ConnectionMeta, Response)>,
channel_responses: crossbeam_channel::TryIter<(ConnectionMeta, Response)>, channel_responses: &ResponseChannelReceiver,
connections: &mut ConnectionMap, connections: &mut ConnectionMap,
){ ){
for (meta, response) in local_responses.chain(channel_responses){ let channel_responses_len = channel_responses.len();
let channel_responses_drain = channel_responses.try_iter()
.take(channel_responses_len);
for (meta, response) in local_responses.chain(channel_responses_drain){
if let Some(established) = connections.get_mut(&meta.poll_token) if let Some(established) = connections.get_mut(&meta.poll_token)
.and_then(Connection::get_established) .and_then(Connection::get_established)
{ {