start work on using seperate threads for sockets and handlers

This commit is contained in:
Joakim Frostegård 2020-04-11 19:35:15 +02:00
parent 8d7cbb7926
commit 3527952242
12 changed files with 180 additions and 98 deletions

View file

@ -14,6 +14,8 @@ name = "aquatic"
[dependencies]
bittorrent_udp = { path = "../bittorrent_udp" }
cli_helpers = { path = "../cli_helpers" }
crossbeam-queue = "0.2"
crossbeam-utils = "0.7"
dashmap = "3"
histogram = "0.6"
indexmap = "1"

View file

@ -3,11 +3,14 @@ use std::sync::atomic::AtomicUsize;
use std::net::{SocketAddr, IpAddr};
use std::time::Instant;
use crossbeam_queue::ArrayQueue;
use dashmap::DashMap;
use indexmap::IndexMap;
pub use bittorrent_udp::types::*;
use crate::config::Config;
pub const MAX_PACKET_SIZE: usize = 4096;
@ -137,14 +140,18 @@ pub struct State {
pub connections: Arc<ConnectionMap>,
pub torrents: Arc<TorrentMap>,
pub statistics: Arc<Statistics>,
pub request_queue: Arc<ArrayQueue<(Request, SocketAddr)>>,
pub response_queue: Arc<ArrayQueue<(Response, SocketAddr)>>,
}
impl State {
pub fn new() -> Self {
pub fn new(config: &Config) -> Self {
Self {
connections: Arc::new(DashMap::new()),
torrents: Arc::new(DashMap::new()),
statistics: Arc::new(Statistics::default()),
request_queue: Arc::new(ArrayQueue::new(config.request_queue_len)),
response_queue: Arc::new(ArrayQueue::new(config.response_queue_len)),
}
}
}

View file

@ -6,7 +6,10 @@ use serde::{Serialize, Deserialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Config {
/// Spawn this number of threads for workers
pub num_threads: usize,
pub socket_workers: usize,
pub response_workers: usize,
pub request_queue_len: usize,
pub response_queue_len: usize,
pub network: NetworkConfig,
pub statistics: StatisticsConfig,
pub cleaning: CleaningConfig,
@ -50,7 +53,10 @@ pub struct CleaningConfig {
impl Default for Config {
fn default() -> Self {
Self {
num_threads: 4,
socket_workers: 1,
response_workers: 1,
request_queue_len: 4096,
response_queue_len: 4096 * 4,
network: NetworkConfig::default(),
statistics: StatisticsConfig::default(),
cleaning: CleaningConfig::default(),

View file

@ -3,7 +3,8 @@ use std::sync::atomic::Ordering;
use std::time::Instant;
use std::vec::Drain;
use rand::{Rng, rngs::{SmallRng, StdRng}};
use rand::{SeedableRng, Rng, rngs::{SmallRng, StdRng}};
use crossbeam_utils::Backoff;
use bittorrent_udp::types::*;
@ -11,16 +12,66 @@ use crate::common::*;
use crate::config::Config;
pub fn handle(
state: State,
config: Config,
){
let mut connect_requests: Vec<(ConnectRequest, SocketAddr)> = Vec::new();
let mut announce_requests: Vec<(AnnounceRequest, SocketAddr)> = Vec::new();
let mut scrape_requests: Vec<(ScrapeRequest, SocketAddr)> = Vec::new();
let mut std_rng = StdRng::from_entropy();
let mut small_rng = SmallRng::from_rng(&mut std_rng).unwrap();
let backoff = Backoff::new();
loop {
if state.request_queue.is_empty(){
backoff.snooze();
} else {
while let Ok((request, src)) = state.request_queue.pop(){
match request {
Request::Connect(r) => {
connect_requests.push((r, src))
},
Request::Announce(r) => {
announce_requests.push((r, src))
},
Request::Scrape(r) => {
scrape_requests.push((r, src))
},
}
}
handle_connect_requests(
&state,
&mut std_rng,
connect_requests.drain(..)
);
handle_announce_requests(
&state,
&config,
&mut small_rng,
announce_requests.drain(..),
);
handle_scrape_requests(
&state,
scrape_requests.drain(..),
);
}
}
}
#[inline]
pub fn handle_connect_requests(
state: &State,
rng: &mut StdRng,
responses: &mut Vec<(Response, SocketAddr)>,
requests: Drain<(ConnectRequest, SocketAddr)>,
){
let now = Time(Instant::now());
responses.extend(requests.map(|(request, src)| {
for (request, src) in requests {
let connection_id = ConnectionId(rng.gen());
let key = ConnectionKey {
@ -37,8 +88,10 @@ pub fn handle_connect_requests(
}
);
(response, src)
}));
if let Err(err) = state.response_queue.push((response, src)){
eprintln!("couldn't push to response queue: {}", err);
}
}
}
@ -47,20 +100,23 @@ pub fn handle_announce_requests(
state: &State,
config: &Config,
rng: &mut SmallRng,
responses: &mut Vec<(Response, SocketAddr)>,
requests: Drain<(AnnounceRequest, SocketAddr)>,
){
responses.extend(requests.map(|(request, src)| {
for (request, src) in requests {
let connection_key = ConnectionKey {
connection_id: request.connection_id,
socket_addr: src,
};
if !state.connections.contains_key(&connection_key){
return ((ErrorResponse {
let response = ErrorResponse {
transaction_id: request.transaction_id,
message: "Connection invalid or expired".to_string()
}).into(), src);
};
if let Err(err) = state.response_queue.push((response.into(), src)){
eprintln!("couldn't push to response queue: {}", err);
}
}
let peer_key = PeerMapKey {
@ -124,30 +180,35 @@ pub fn handle_announce_requests(
peers: response_peers
});
(response, src)
}));
if let Err(err) = state.response_queue.push((response, src)){
eprintln!("couldn't push to response queue: {}", err);
}
}
}
#[inline]
pub fn handle_scrape_requests(
state: &State,
responses: &mut Vec<(Response, SocketAddr)>,
requests: Drain<(ScrapeRequest, SocketAddr)>,
){
let empty_stats = create_torrent_scrape_statistics(0, 0);
responses.extend(requests.map(|(request, src)| {
for (request, src) in requests {
let connection_key = ConnectionKey {
connection_id: request.connection_id,
socket_addr: src,
};
if !state.connections.contains_key(&connection_key){
return ((ErrorResponse {
let response = ErrorResponse {
transaction_id: request.transaction_id,
message: "Connection invalid or expired".to_string()
}).into(), src);
};
if let Err(err) = state.response_queue.push((response.into(), src)){
eprintln!("couldn't push to response queue: {}", err);
}
}
let mut stats: Vec<TorrentScrapeStatistics> = Vec::with_capacity(
@ -170,8 +231,10 @@ pub fn handle_scrape_requests(
torrent_stats: stats,
});
(response, src)
}));
if let Err(err) = state.response_queue.push((response, src)){
eprintln!("couldn't push to response queue: {}", err);
}
};
}

View file

@ -11,9 +11,18 @@ use common::State;
pub fn run(config: Config){
let state = State::new();
let state = State::new(&config);
for i in 0..config.num_threads {
for _ in 0..config.response_workers {
let state = state.clone();
let config = config.clone();
::std::thread::spawn(move || {
handlers::handle(state, config);
});
}
for i in 0..config.socket_workers {
let state = state.clone();
let config = config.clone();

View file

@ -1,19 +1,17 @@
use std::sync::atomic::Ordering;
use std::net::SocketAddr;
use std::io::{Cursor, ErrorKind};
use std::time::Duration;
use mio::{Events, Poll, Interest, Token};
use mio::net::UdpSocket;
use net2::{UdpSocketExt, UdpBuilder};
use net2::unix::UnixUdpBuilderExt;
use rand::{SeedableRng, rngs::{SmallRng, StdRng}};
use bittorrent_udp::types::IpVersion;
use bittorrent_udp::converters::{response_to_bytes, request_from_bytes};
use crate::common::*;
use crate::config::Config;
use crate::handlers::*;
pub fn run_event_loop(
@ -34,16 +32,10 @@ pub fn run_event_loop(
let mut events = Events::with_capacity(config.network.poll_event_capacity);
let mut connect_requests: Vec<(ConnectRequest, SocketAddr)> = Vec::new();
let mut announce_requests: Vec<(AnnounceRequest, SocketAddr)> = Vec::new();
let mut scrape_requests: Vec<(ScrapeRequest, SocketAddr)> = Vec::new();
let mut responses: Vec<(Response, SocketAddr)> = Vec::new();
let mut std_rng = StdRng::from_entropy();
let mut small_rng = SmallRng::from_rng(&mut std_rng).unwrap();
let timeout = Duration::from_millis(1);
loop {
poll.poll(&mut events, None)
poll.poll(&mut events, Some(timeout))
.expect("failed polling");
for event in events.iter(){
@ -51,17 +43,11 @@ pub fn run_event_loop(
if token.0 == token_num {
if event.is_readable(){
handle_readable_socket(
read_requests(
&state,
&config,
&mut socket,
&mut std_rng,
&mut small_rng,
&mut buffer,
&mut responses,
&mut connect_requests,
&mut announce_requests,
&mut scrape_requests
);
state.statistics.readable_events.fetch_add(1, Ordering::SeqCst);
@ -72,6 +58,13 @@ pub fn run_event_loop(
}
}
}
send_responses(
&state,
&config,
&mut socket,
&mut buffer,
);
}
}
@ -108,24 +101,15 @@ fn create_socket(config: &Config) -> ::std::net::UdpSocket {
}
/// Read requests, generate and send back responses
#[inline]
fn handle_readable_socket(
fn read_requests(
state: &State,
config: &Config,
socket: &mut UdpSocket,
std_rng: &mut StdRng,
small_rng: &mut SmallRng,
buffer: &mut [u8],
responses: &mut Vec<(Response, SocketAddr)>,
connect_requests: &mut Vec<(ConnectRequest, SocketAddr)>,
announce_requests: &mut Vec<(AnnounceRequest, SocketAddr)>,
scrape_requests: &mut Vec<(ScrapeRequest, SocketAddr)>,
){
let mut requests_received: usize = 0;
let mut responses_sent: usize = 0;
let mut bytes_received: usize = 0;
let mut bytes_sent: usize = 0;
loop {
match socket.recv_from(&mut buffer[..]) {
@ -142,14 +126,12 @@ fn handle_readable_socket(
}
match request {
Ok(Request::Connect(r)) => {
connect_requests.push((r, src));
},
Ok(Request::Announce(r)) => {
announce_requests.push((r, src));
},
Ok(Request::Scrape(r)) => {
scrape_requests.push((r, src));
Ok(request) => {
let res = state.request_queue.push((request, src));
if let Err(err) = res {
eprintln!("couldn't push request to queue: {}", err);
}
},
Err(err) => {
eprintln!("request_from_bytes error: {:?}", err);
@ -169,7 +151,7 @@ fn handle_readable_socket(
message,
};
responses.push((response.into(), src));
// responses.push((response.into(), src)); // FIXME
}
}
},
@ -185,28 +167,28 @@ fn handle_readable_socket(
}
}
handle_connect_requests(
state,
std_rng,
responses,
connect_requests.drain(..)
);
handle_announce_requests(
state,
config,
small_rng,
responses,
announce_requests.drain(..),
);
handle_scrape_requests(
state,
responses,
scrape_requests.drain(..),
);
if config.statistics.interval != 0 {
state.statistics.requests_received
.fetch_add(requests_received, Ordering::SeqCst);
state.statistics.bytes_received
.fetch_add(bytes_received, Ordering::SeqCst);
}
}
#[inline]
fn send_responses(
state: &State,
config: &Config,
socket: &mut UdpSocket,
buffer: &mut [u8],
){
let mut responses_sent: usize = 0;
let mut bytes_sent: usize = 0;
let mut cursor = Cursor::new(buffer);
for (response, src) in responses.drain(..) {
while let Ok((response, src)) = state.response_queue.pop(){
cursor.set_position(0);
response_to_bytes(&mut cursor, response, IpVersion::IPv4).unwrap();
@ -229,12 +211,8 @@ fn handle_readable_socket(
}
if config.statistics.interval != 0 {
state.statistics.requests_received
.fetch_add(requests_received, Ordering::SeqCst);
state.statistics.responses_sent
.fetch_add(responses_sent, Ordering::SeqCst);
state.statistics.bytes_received
.fetch_add(bytes_received, Ordering::SeqCst);
state.statistics.bytes_sent
.fetch_add(bytes_sent, Ordering::SeqCst);
}