From 7616df96864879ac8dce1e201b22b3eb1dc8a004 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 18 Oct 2021 01:14:32 +0200 Subject: [PATCH 01/15] aquatic_udp: validate requests in socket workers Also, don't send error responses for unconnected requests --- TODO.md | 2 - aquatic_udp/src/lib/common.rs | 7 +- aquatic_udp/src/lib/handlers/connect.rs | 39 ----------- aquatic_udp/src/lib/handlers/mod.rs | 90 +++++-------------------- aquatic_udp/src/lib/lib.rs | 1 - aquatic_udp/src/lib/network.rs | 68 ++++++++++++++----- aquatic_udp/src/lib/tasks.rs | 10 --- aquatic_udp_bench/src/announce.rs | 20 ++---- aquatic_udp_bench/src/connect.rs | 80 ---------------------- aquatic_udp_bench/src/main.rs | 11 --- aquatic_udp_bench/src/scrape.rs | 19 ++---- 11 files changed, 88 insertions(+), 259 deletions(-) delete mode 100644 aquatic_udp/src/lib/handlers/connect.rs delete mode 100644 aquatic_udp_bench/src/connect.rs diff --git a/TODO.md b/TODO.md index 66c2b6f..45e45d5 100644 --- a/TODO.md +++ b/TODO.md @@ -1,7 +1,5 @@ # TODO -* aquatic_udp needs to check connection validity before sending error responses! connection validity checks could be moved to socket workers, since theyare sharded by ip - * access lists: * use arc-swap Cache * test functionality diff --git a/aquatic_udp/src/lib/common.rs b/aquatic_udp/src/lib/common.rs index 67f0f4b..132666c 100644 --- a/aquatic_udp/src/lib/common.rs +++ b/aquatic_udp/src/lib/common.rs @@ -30,6 +30,11 @@ impl Ip for Ipv6Addr { } } +pub enum ConnectedRequest { + Announce(AnnounceRequest), + Scrape(ScrapeRequest), +} + #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct ConnectionKey { pub connection_id: ConnectionId, @@ -180,7 +185,6 @@ pub struct Statistics { #[derive(Clone)] pub struct State { pub access_list: Arc, - pub connections: Arc>, pub torrents: Arc>, pub statistics: Arc, } @@ -189,7 +193,6 @@ impl Default for State { fn default() -> Self { Self { access_list: Arc::new(AccessList::default()), - connections: Arc::new(Mutex::new(HashMap::new())), torrents: Arc::new(Mutex::new(TorrentMaps::default())), statistics: Arc::new(Statistics::default()), } diff --git a/aquatic_udp/src/lib/handlers/connect.rs b/aquatic_udp/src/lib/handlers/connect.rs deleted file mode 100644 index e058241..0000000 --- a/aquatic_udp/src/lib/handlers/connect.rs +++ /dev/null @@ -1,39 +0,0 @@ -use std::net::SocketAddr; -use std::vec::Drain; - -use parking_lot::MutexGuard; -use rand::{rngs::StdRng, Rng}; - -use aquatic_udp_protocol::*; - -use crate::common::*; -use crate::config::Config; - -#[inline] -pub fn handle_connect_requests( - config: &Config, - connections: &mut MutexGuard, - rng: &mut StdRng, - requests: Drain<(ConnectRequest, SocketAddr)>, - responses: &mut Vec<(Response, SocketAddr)>, -) { - let valid_until = ValidUntil::new(config.cleaning.max_connection_age); - - responses.extend(requests.map(|(request, src)| { - let connection_id = ConnectionId(rng.gen()); - - let key = ConnectionKey { - connection_id, - socket_addr: src, - }; - - connections.insert(key, valid_until); - - let response = Response::Connect(ConnectResponse { - connection_id, - transaction_id: request.transaction_id, - }); - - (response, src) - })); -} diff --git a/aquatic_udp/src/lib/handlers/mod.rs b/aquatic_udp/src/lib/handlers/mod.rs index a1906d9..5cb7d39 100644 --- a/aquatic_udp/src/lib/handlers/mod.rs +++ b/aquatic_udp/src/lib/handlers/mod.rs @@ -14,20 +14,17 @@ use crate::common::*; use crate::config::Config; mod announce; -mod connect; mod scrape; use announce::handle_announce_requests; -use connect::handle_connect_requests; use scrape::handle_scrape_requests; pub fn run_request_worker( state: State, config: Config, - request_receiver: Receiver<(Request, SocketAddr)>, + request_receiver: Receiver<(ConnectedRequest, SocketAddr)>, response_sender: Sender<(Response, SocketAddr)>, ) { - let mut connect_requests: Vec<(ConnectRequest, SocketAddr)> = Vec::new(); let mut announce_requests: Vec<(AnnounceRequest, SocketAddr)> = Vec::new(); let mut scrape_requests: Vec<(ScrapeRequest, SocketAddr)> = Vec::new(); @@ -39,15 +36,15 @@ pub fn run_request_worker( let timeout = Duration::from_micros(config.handlers.channel_recv_timeout_microseconds); loop { - let mut opt_connections = None; + let mut opt_torrents = None; // Collect requests from channel, divide them by type // // Collect a maximum number of request. Stop collecting before that // number is reached if having waited for too long for a request, but - // only if ConnectionMap mutex isn't locked. + // only if TorrentMaps mutex isn't locked. for i in 0..config.handlers.max_requests_per_iter { - let (request, src): (Request, SocketAddr) = if i == 0 { + let (request, src): (ConnectedRequest, SocketAddr) = if i == 0 { match request_receiver.recv() { Ok(r) => r, Err(_) => break, // Really shouldn't happen @@ -56,8 +53,8 @@ pub fn run_request_worker( match request_receiver.recv_timeout(timeout) { Ok(r) => r, Err(_) => { - if let Some(guard) = state.connections.try_lock() { - opt_connections = Some(guard); + if let Some(guard) = state.torrents.try_lock() { + opt_torrents = Some(guard); break; } else { @@ -68,69 +65,25 @@ pub fn run_request_worker( }; match request { - Request::Connect(r) => connect_requests.push((r, src)), - Request::Announce(r) => announce_requests.push((r, src)), - Request::Scrape(r) => scrape_requests.push((r, src)), + ConnectedRequest::Announce(r) => announce_requests.push((r, src)), + ConnectedRequest::Scrape(r) => scrape_requests.push((r, src)), } } - let mut connections: MutexGuard = - opt_connections.unwrap_or_else(|| state.connections.lock()); - - handle_connect_requests( - &config, - &mut connections, - &mut std_rng, - connect_requests.drain(..), - &mut responses, - ); - - // Check announce and scrape requests for valid connections - - announce_requests.retain(|(request, src)| { - let connection_valid = - connections.contains_key(&ConnectionKey::new(request.connection_id, *src)); - - if !connection_valid { - responses.push(( - create_invalid_connection_response(request.transaction_id), - *src, - )); - } - - connection_valid - }); - - scrape_requests.retain(|(request, src)| { - let connection_valid = - connections.contains_key(&ConnectionKey::new(request.connection_id, *src)); - - if !connection_valid { - responses.push(( - create_invalid_connection_response(request.transaction_id), - *src, - )); - } - - connection_valid - }); - - ::std::mem::drop(connections); + let mut torrents: MutexGuard = + opt_torrents.unwrap_or_else(|| state.torrents.lock()); // Generate responses for announce and scrape requests - if !(announce_requests.is_empty() && scrape_requests.is_empty()) { - let mut torrents = state.torrents.lock(); + handle_announce_requests( + &config, + &mut torrents, + &mut small_rng, + announce_requests.drain(..), + &mut responses, + ); - handle_announce_requests( - &config, - &mut torrents, - &mut small_rng, - announce_requests.drain(..), - &mut responses, - ); - handle_scrape_requests(&mut torrents, scrape_requests.drain(..), &mut responses); - } + handle_scrape_requests(&mut torrents, scrape_requests.drain(..), &mut responses); for r in responses.drain(..) { if let Err(err) = response_sender.send(r) { @@ -139,10 +92,3 @@ pub fn run_request_worker( } } } - -fn create_invalid_connection_response(transaction_id: TransactionId) -> Response { - Response::Error(ErrorResponse { - transaction_id, - message: "Connection invalid or expired".into(), - }) -} diff --git a/aquatic_udp/src/lib/lib.rs b/aquatic_udp/src/lib/lib.rs index 5e32d83..7ccee1c 100644 --- a/aquatic_udp/src/lib/lib.rs +++ b/aquatic_udp/src/lib/lib.rs @@ -55,7 +55,6 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { loop { ::std::thread::sleep(Duration::from_secs(config.cleaning.interval)); - tasks::clean_connections(&state); tasks::update_access_list(&config, &state); state.torrents.lock().clean(&config, &state.access_list); diff --git a/aquatic_udp/src/lib/network.rs b/aquatic_udp/src/lib/network.rs index b1baf65..ccea096 100644 --- a/aquatic_udp/src/lib/network.rs +++ b/aquatic_udp/src/lib/network.rs @@ -4,12 +4,13 @@ use std::sync::{ atomic::{AtomicUsize, Ordering}, Arc, }; -use std::time::Duration; +use std::time::{Duration, Instant}; use std::vec::Drain; use crossbeam_channel::{Receiver, Sender}; use mio::net::UdpSocket; use mio::{Events, Interest, Poll, Token}; +use rand::prelude::{Rng, SeedableRng, StdRng}; use socket2::{Domain, Protocol, Socket, Type}; use aquatic_udp_protocol::{IpVersion, Request, Response}; @@ -21,10 +22,11 @@ pub fn run_socket_worker( state: State, config: Config, token_num: usize, - request_sender: Sender<(Request, SocketAddr)>, + request_sender: Sender<(ConnectedRequest, SocketAddr)>, response_receiver: Receiver<(Response, SocketAddr)>, num_bound_sockets: Arc, ) { + let mut rng = StdRng::from_entropy(); let mut buffer = [0u8; MAX_PACKET_SIZE]; let mut socket = UdpSocket::from_std(create_socket(&config)); @@ -39,8 +41,9 @@ pub fn run_socket_worker( num_bound_sockets.fetch_add(1, Ordering::SeqCst); let mut events = Events::with_capacity(config.network.poll_event_capacity); + let mut connections = ConnectionMap::default(); - let mut requests: Vec<(Request, SocketAddr)> = Vec::new(); + let mut requests: Vec<(ConnectedRequest, SocketAddr)> = Vec::new(); let mut local_responses: Vec<(Response, SocketAddr)> = Vec::new(); let timeout = Duration::from_millis(50); @@ -54,8 +57,10 @@ pub fn run_socket_worker( if (token.0 == token_num) & event.is_readable() { read_requests( - &state, &config, + &state, + &mut connections, + &mut rng, &mut socket, &mut buffer, &mut requests, @@ -83,6 +88,11 @@ pub fn run_socket_worker( &response_receiver, local_responses.drain(..), ); + + let now = Instant::now(); + + connections.retain(|_, v| v.0 > now); + connections.shrink_to_fit(); } } @@ -121,16 +131,19 @@ fn create_socket(config: &Config) -> ::std::net::UdpSocket { #[inline] fn read_requests( - state: &State, config: &Config, + state: &State, + connections: &mut ConnectionMap, + rng: &mut StdRng, socket: &mut UdpSocket, buffer: &mut [u8], - requests: &mut Vec<(Request, SocketAddr)>, + requests: &mut Vec<(ConnectedRequest, SocketAddr)>, local_responses: &mut Vec<(Response, SocketAddr)>, ) { let mut requests_received: usize = 0; let mut bytes_received: usize = 0; + let valid_until = ValidUntil::new(config.cleaning.max_connection_age); let access_list_mode = config.access_list.mode; loop { @@ -146,20 +159,43 @@ fn read_requests( } match request { - Ok(Request::Announce(AnnounceRequest { - info_hash, - transaction_id, - .. - })) if !state.access_list.allows(access_list_mode, &info_hash.0) => { - let response = Response::Error(ErrorResponse { - transaction_id, - message: "Info hash not allowed".into(), + Ok(Request::Connect(request)) => { + let connection_id = ConnectionId(rng.gen()); + + connections.insert(ConnectionKey::new(connection_id, src), valid_until); + + let response = Response::Connect(ConnectResponse { + connection_id, + transaction_id: request.transaction_id, }); local_responses.push((response, src)) } - Ok(request) => { - requests.push((request, src)); + Ok(Request::Announce(request)) => { + let key = ConnectionKey::new(request.connection_id, src); + + if connections.contains_key(&key) { + if state + .access_list + .allows(access_list_mode, &request.info_hash.0) + { + requests.push((ConnectedRequest::Announce(request), src)); + } else { + let response = Response::Error(ErrorResponse { + transaction_id: request.transaction_id, + message: "Info hash not allowed".into(), + }); + + local_responses.push((response, src)) + } + } + } + Ok(Request::Scrape(request)) => { + let key = ConnectionKey::new(request.connection_id, src); + + if connections.contains_key(&key) { + requests.push((ConnectedRequest::Scrape(request), src)); + } } Err(err) => { ::log::debug!("request_from_bytes error: {:?}", err); diff --git a/aquatic_udp/src/lib/tasks.rs b/aquatic_udp/src/lib/tasks.rs index 83acd3b..44675c9 100644 --- a/aquatic_udp/src/lib/tasks.rs +++ b/aquatic_udp/src/lib/tasks.rs @@ -1,5 +1,4 @@ use std::sync::atomic::Ordering; -use std::time::Instant; use histogram::Histogram; @@ -19,15 +18,6 @@ pub fn update_access_list(config: &Config, state: &State) { } } -pub fn clean_connections(state: &State) { - let now = Instant::now(); - - let mut connections = state.connections.lock(); - - connections.retain(|_, v| v.0 > now); - connections.shrink_to_fit(); -} - pub fn gather_and_print_statistics(state: &State, config: &Config) { let interval = config.statistics.interval; diff --git a/aquatic_udp_bench/src/announce.rs b/aquatic_udp_bench/src/announce.rs index 8288713..c759b97 100644 --- a/aquatic_udp_bench/src/announce.rs +++ b/aquatic_udp_bench/src/announce.rs @@ -1,4 +1,4 @@ -use std::net::SocketAddr; +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::time::{Duration, Instant}; use crossbeam_channel::{Receiver, Sender}; @@ -13,15 +13,14 @@ use crate::common::*; use crate::config::BenchConfig; pub fn bench_announce_handler( - state: &State, bench_config: &BenchConfig, aquatic_config: &Config, - request_sender: &Sender<(Request, SocketAddr)>, + request_sender: &Sender<(ConnectedRequest, SocketAddr)>, response_receiver: &Receiver<(Response, SocketAddr)>, rng: &mut impl Rng, info_hashes: &[InfoHash], ) -> (usize, Duration) { - let requests = create_requests(state, rng, info_hashes, bench_config.num_announce_requests); + let requests = create_requests(rng, info_hashes, bench_config.num_announce_requests); let p = aquatic_config.handlers.max_requests_per_iter * bench_config.num_threads; let mut num_responses = 0usize; @@ -37,7 +36,7 @@ pub fn bench_announce_handler( for round in (0..bench_config.num_rounds).progress_with(pb) { for request_chunk in requests.chunks(p) { for (request, src) in request_chunk { - request_sender.send((request.clone().into(), *src)).unwrap(); + request_sender.send((ConnectedRequest::Announce(request.clone()), *src)).unwrap(); } while let Ok((Response::Announce(r), _)) = response_receiver.try_recv() { @@ -72,7 +71,6 @@ pub fn bench_announce_handler( } pub fn create_requests( - state: &State, rng: &mut impl Rng, info_hashes: &[InfoHash], number: usize, @@ -83,15 +81,11 @@ pub fn create_requests( let mut requests = Vec::new(); - let connections = state.connections.lock(); - - let connection_keys: Vec = connections.keys().take(number).cloned().collect(); - - for connection_key in connection_keys.into_iter() { + for _ in 0..number { let info_hash_index = pareto_usize(rng, pareto, max_index); let request = AnnounceRequest { - connection_id: connection_key.connection_id, + connection_id: ConnectionId(0), transaction_id: TransactionId(rng.gen()), info_hash: info_hashes[info_hash_index], peer_id: PeerId(rng.gen()), @@ -105,7 +99,7 @@ pub fn create_requests( port: Port(rng.gen()), }; - requests.push((request, connection_key.socket_addr)); + requests.push((request, SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 1)))); } requests diff --git a/aquatic_udp_bench/src/connect.rs b/aquatic_udp_bench/src/connect.rs deleted file mode 100644 index 3f3bc05..0000000 --- a/aquatic_udp_bench/src/connect.rs +++ /dev/null @@ -1,80 +0,0 @@ -use std::time::{Duration, Instant}; - -use crossbeam_channel::{Receiver, Sender}; -use indicatif::ProgressIterator; -use rand::{rngs::SmallRng, thread_rng, Rng, SeedableRng}; -use std::net::SocketAddr; - -use aquatic_udp::common::*; -use aquatic_udp::config::Config; - -use crate::common::*; -use crate::config::BenchConfig; - -pub fn bench_connect_handler( - bench_config: &BenchConfig, - aquatic_config: &Config, - request_sender: &Sender<(Request, SocketAddr)>, - response_receiver: &Receiver<(Response, SocketAddr)>, -) -> (usize, Duration) { - let requests = create_requests(bench_config.num_connect_requests); - - let p = aquatic_config.handlers.max_requests_per_iter * bench_config.num_threads; - let mut num_responses = 0usize; - - let mut dummy: i64 = thread_rng().gen(); - - let pb = create_progress_bar("Connect", bench_config.num_rounds as u64); - - // Start connect benchmark - - let before = Instant::now(); - - for round in (0..bench_config.num_rounds).progress_with(pb) { - for request_chunk in requests.chunks(p) { - for (request, src) in request_chunk { - request_sender.send((request.clone().into(), *src)).unwrap(); - } - - while let Ok((Response::Connect(r), _)) = response_receiver.try_recv() { - num_responses += 1; - dummy ^= r.connection_id.0; - } - } - - let total = bench_config.num_connect_requests * (round + 1); - - while num_responses < total { - if let Ok((Response::Connect(r), _)) = response_receiver.recv() { - num_responses += 1; - dummy ^= r.connection_id.0; - } - } - } - - let elapsed = before.elapsed(); - - if dummy == 0 { - println!("dummy dummy"); - } - - (num_responses, elapsed) -} - -pub fn create_requests(number: usize) -> Vec<(ConnectRequest, SocketAddr)> { - let mut rng = SmallRng::from_rng(thread_rng()).unwrap(); - - let mut requests = Vec::new(); - - for _ in 0..number { - let request = ConnectRequest { - transaction_id: TransactionId(rng.gen()), - }; - - let src = SocketAddr::from(([rng.gen(), rng.gen(), rng.gen(), rng.gen()], rng.gen())); - - requests.push((request, src)); - } - - requests -} diff --git a/aquatic_udp_bench/src/main.rs b/aquatic_udp_bench/src/main.rs index a64cb07..c0313df 100644 --- a/aquatic_udp_bench/src/main.rs +++ b/aquatic_udp_bench/src/main.rs @@ -29,7 +29,6 @@ use config::BenchConfig; mod announce; mod common; mod config; -mod connect; mod scrape; #[global_allocator] @@ -65,18 +64,10 @@ pub fn run(bench_config: BenchConfig) -> ::anyhow::Result<()> { // Run benchmarks - let c = connect::bench_connect_handler( - &bench_config, - &aquatic_config, - &request_sender, - &response_receiver, - ); - let mut rng = SmallRng::from_rng(thread_rng()).unwrap(); let info_hashes = create_info_hashes(&mut rng); let a = announce::bench_announce_handler( - &state, &bench_config, &aquatic_config, &request_sender, @@ -86,7 +77,6 @@ pub fn run(bench_config: BenchConfig) -> ::anyhow::Result<()> { ); let s = scrape::bench_scrape_handler( - &state, &bench_config, &aquatic_config, &request_sender, @@ -100,7 +90,6 @@ pub fn run(bench_config: BenchConfig) -> ::anyhow::Result<()> { bench_config.num_rounds, bench_config.num_threads, ); - print_results("Connect: ", c.0, c.1); print_results("Announce:", a.0, a.1); print_results("Scrape: ", s.0, s.1); diff --git a/aquatic_udp_bench/src/scrape.rs b/aquatic_udp_bench/src/scrape.rs index 09534cc..26a5ad3 100644 --- a/aquatic_udp_bench/src/scrape.rs +++ b/aquatic_udp_bench/src/scrape.rs @@ -1,4 +1,4 @@ -use std::net::SocketAddr; +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::time::{Duration, Instant}; use crossbeam_channel::{Receiver, Sender}; @@ -13,16 +13,14 @@ use crate::common::*; use crate::config::BenchConfig; pub fn bench_scrape_handler( - state: &State, bench_config: &BenchConfig, aquatic_config: &Config, - request_sender: &Sender<(Request, SocketAddr)>, + request_sender: &Sender<(ConnectedRequest, SocketAddr)>, response_receiver: &Receiver<(Response, SocketAddr)>, rng: &mut impl Rng, info_hashes: &[InfoHash], ) -> (usize, Duration) { let requests = create_requests( - state, rng, info_hashes, bench_config.num_scrape_requests, @@ -43,7 +41,7 @@ pub fn bench_scrape_handler( for round in (0..bench_config.num_rounds).progress_with(pb) { for request_chunk in requests.chunks(p) { for (request, src) in request_chunk { - request_sender.send((request.clone().into(), *src)).unwrap(); + request_sender.send((ConnectedRequest::Scrape(request.clone()), *src)).unwrap(); } while let Ok((Response::Scrape(r), _)) = response_receiver.try_recv() { @@ -78,7 +76,6 @@ pub fn bench_scrape_handler( } pub fn create_requests( - state: &State, rng: &mut impl Rng, info_hashes: &[InfoHash], number: usize, @@ -88,13 +85,9 @@ pub fn create_requests( let max_index = info_hashes.len() - 1; - let connections = state.connections.lock(); - - let connection_keys: Vec = connections.keys().take(number).cloned().collect(); - let mut requests = Vec::new(); - for connection_key in connection_keys.into_iter() { + for _ in 0..number { let mut request_info_hashes = Vec::new(); for _ in 0..hashes_per_request { @@ -103,12 +96,12 @@ pub fn create_requests( } let request = ScrapeRequest { - connection_id: connection_key.connection_id, + connection_id: ConnectionId(0), transaction_id: TransactionId(rng.gen()), info_hashes: request_info_hashes, }; - requests.push((request, connection_key.socket_addr)); + requests.push((request, SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 1)))); } requests From de85feec9a02319013463a241e9aa72f4c7b9ef8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 18 Oct 2021 01:25:04 +0200 Subject: [PATCH 02/15] aquatic_udp: add and use ConnectedResponse enum --- aquatic_udp/src/lib/common.rs | 14 ++++++++++++++ aquatic_udp/src/lib/handlers/announce.rs | 4 ++-- aquatic_udp/src/lib/handlers/mod.rs | 4 ++-- aquatic_udp/src/lib/handlers/scrape.rs | 4 ++-- aquatic_udp/src/lib/network.rs | 12 +++++++----- aquatic_udp_bench/src/announce.rs | 15 ++++++++++----- aquatic_udp_bench/src/scrape.rs | 15 ++++++++++----- 7 files changed, 47 insertions(+), 21 deletions(-) diff --git a/aquatic_udp/src/lib/common.rs b/aquatic_udp/src/lib/common.rs index 132666c..3f48017 100644 --- a/aquatic_udp/src/lib/common.rs +++ b/aquatic_udp/src/lib/common.rs @@ -35,6 +35,20 @@ pub enum ConnectedRequest { Scrape(ScrapeRequest), } +pub enum ConnectedResponse { + Announce(AnnounceResponse), + Scrape(ScrapeResponse), +} + +impl Into for ConnectedResponse { + fn into(self) -> Response { + match self { + Self::Announce(response) => Response::Announce(response), + Self::Scrape(response) => Response::Scrape(response), + } + } +} + #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct ConnectionKey { pub connection_id: ConnectionId, diff --git a/aquatic_udp/src/lib/handlers/announce.rs b/aquatic_udp/src/lib/handlers/announce.rs index bda60d6..913a0d6 100644 --- a/aquatic_udp/src/lib/handlers/announce.rs +++ b/aquatic_udp/src/lib/handlers/announce.rs @@ -16,7 +16,7 @@ pub fn handle_announce_requests( torrents: &mut MutexGuard, rng: &mut SmallRng, requests: Drain<(AnnounceRequest, SocketAddr)>, - responses: &mut Vec<(Response, SocketAddr)>, + responses: &mut Vec<(ConnectedResponse, SocketAddr)>, ) { let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); @@ -42,7 +42,7 @@ pub fn handle_announce_requests( ), }; - (Response::Announce(response), src) + (ConnectedResponse::Announce(response), src) })); } diff --git a/aquatic_udp/src/lib/handlers/mod.rs b/aquatic_udp/src/lib/handlers/mod.rs index 5cb7d39..a7597e0 100644 --- a/aquatic_udp/src/lib/handlers/mod.rs +++ b/aquatic_udp/src/lib/handlers/mod.rs @@ -23,12 +23,12 @@ pub fn run_request_worker( state: State, config: Config, request_receiver: Receiver<(ConnectedRequest, SocketAddr)>, - response_sender: Sender<(Response, SocketAddr)>, + response_sender: Sender<(ConnectedResponse, SocketAddr)>, ) { let mut announce_requests: Vec<(AnnounceRequest, SocketAddr)> = Vec::new(); let mut scrape_requests: Vec<(ScrapeRequest, SocketAddr)> = Vec::new(); - let mut responses: Vec<(Response, SocketAddr)> = Vec::new(); + let mut responses: Vec<(ConnectedResponse, SocketAddr)> = Vec::new(); let mut std_rng = StdRng::from_entropy(); let mut small_rng = SmallRng::from_rng(&mut std_rng).unwrap(); diff --git a/aquatic_udp/src/lib/handlers/scrape.rs b/aquatic_udp/src/lib/handlers/scrape.rs index 8198bd8..b544ccf 100644 --- a/aquatic_udp/src/lib/handlers/scrape.rs +++ b/aquatic_udp/src/lib/handlers/scrape.rs @@ -12,7 +12,7 @@ use crate::common::*; pub fn handle_scrape_requests( torrents: &mut MutexGuard, requests: Drain<(ScrapeRequest, SocketAddr)>, - responses: &mut Vec<(Response, SocketAddr)>, + responses: &mut Vec<(ConnectedResponse, SocketAddr)>, ) { let empty_stats = create_torrent_scrape_statistics(0, 0); @@ -45,7 +45,7 @@ pub fn handle_scrape_requests( } } - let response = Response::Scrape(ScrapeResponse { + let response = ConnectedResponse::Scrape(ScrapeResponse { transaction_id: request.transaction_id, torrent_stats: stats, }); diff --git a/aquatic_udp/src/lib/network.rs b/aquatic_udp/src/lib/network.rs index ccea096..c8cd776 100644 --- a/aquatic_udp/src/lib/network.rs +++ b/aquatic_udp/src/lib/network.rs @@ -23,7 +23,7 @@ pub fn run_socket_worker( config: Config, token_num: usize, request_sender: Sender<(ConnectedRequest, SocketAddr)>, - response_receiver: Receiver<(Response, SocketAddr)>, + response_receiver: Receiver<(ConnectedResponse, SocketAddr)>, num_bound_sockets: Arc, ) { let mut rng = StdRng::from_entropy(); @@ -249,7 +249,7 @@ fn send_responses( config: &Config, socket: &mut UdpSocket, buffer: &mut [u8], - response_receiver: &Receiver<(Response, SocketAddr)>, + response_receiver: &Receiver<(ConnectedResponse, SocketAddr)>, local_responses: Drain<(Response, SocketAddr)>, ) { let mut responses_sent: usize = 0; @@ -257,9 +257,11 @@ fn send_responses( let mut cursor = Cursor::new(buffer); - let response_iterator = local_responses - .into_iter() - .chain(response_receiver.try_iter()); + let response_iterator = local_responses.into_iter().chain( + response_receiver + .try_iter() + .map(|(response, addr)| (response.into(), addr)), + ); for (response, src) in response_iterator { cursor.set_position(0); diff --git a/aquatic_udp_bench/src/announce.rs b/aquatic_udp_bench/src/announce.rs index c759b97..12b35e3 100644 --- a/aquatic_udp_bench/src/announce.rs +++ b/aquatic_udp_bench/src/announce.rs @@ -16,7 +16,7 @@ pub fn bench_announce_handler( bench_config: &BenchConfig, aquatic_config: &Config, request_sender: &Sender<(ConnectedRequest, SocketAddr)>, - response_receiver: &Receiver<(Response, SocketAddr)>, + response_receiver: &Receiver<(ConnectedResponse, SocketAddr)>, rng: &mut impl Rng, info_hashes: &[InfoHash], ) -> (usize, Duration) { @@ -36,10 +36,12 @@ pub fn bench_announce_handler( for round in (0..bench_config.num_rounds).progress_with(pb) { for request_chunk in requests.chunks(p) { for (request, src) in request_chunk { - request_sender.send((ConnectedRequest::Announce(request.clone()), *src)).unwrap(); + request_sender + .send((ConnectedRequest::Announce(request.clone()), *src)) + .unwrap(); } - while let Ok((Response::Announce(r), _)) = response_receiver.try_recv() { + while let Ok((ConnectedResponse::Announce(r), _)) = response_receiver.try_recv() { num_responses += 1; if let Some(last_peer) = r.peers.last() { @@ -51,7 +53,7 @@ pub fn bench_announce_handler( let total = bench_config.num_announce_requests * (round + 1); while num_responses < total { - if let Ok((Response::Announce(r), _)) = response_receiver.recv() { + if let Ok((ConnectedResponse::Announce(r), _)) = response_receiver.recv() { num_responses += 1; if let Some(last_peer) = r.peers.last() { @@ -99,7 +101,10 @@ pub fn create_requests( port: Port(rng.gen()), }; - requests.push((request, SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 1)))); + requests.push(( + request, + SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 1)), + )); } requests diff --git a/aquatic_udp_bench/src/scrape.rs b/aquatic_udp_bench/src/scrape.rs index 26a5ad3..7b62152 100644 --- a/aquatic_udp_bench/src/scrape.rs +++ b/aquatic_udp_bench/src/scrape.rs @@ -16,7 +16,7 @@ pub fn bench_scrape_handler( bench_config: &BenchConfig, aquatic_config: &Config, request_sender: &Sender<(ConnectedRequest, SocketAddr)>, - response_receiver: &Receiver<(Response, SocketAddr)>, + response_receiver: &Receiver<(ConnectedResponse, SocketAddr)>, rng: &mut impl Rng, info_hashes: &[InfoHash], ) -> (usize, Duration) { @@ -41,10 +41,12 @@ pub fn bench_scrape_handler( for round in (0..bench_config.num_rounds).progress_with(pb) { for request_chunk in requests.chunks(p) { for (request, src) in request_chunk { - request_sender.send((ConnectedRequest::Scrape(request.clone()), *src)).unwrap(); + request_sender + .send((ConnectedRequest::Scrape(request.clone()), *src)) + .unwrap(); } - while let Ok((Response::Scrape(r), _)) = response_receiver.try_recv() { + while let Ok((ConnectedResponse::Scrape(r), _)) = response_receiver.try_recv() { num_responses += 1; if let Some(stat) = r.torrent_stats.last() { @@ -56,7 +58,7 @@ pub fn bench_scrape_handler( let total = bench_config.num_scrape_requests * (round + 1); while num_responses < total { - if let Ok((Response::Scrape(r), _)) = response_receiver.recv() { + if let Ok((ConnectedResponse::Scrape(r), _)) = response_receiver.recv() { num_responses += 1; if let Some(stat) = r.torrent_stats.last() { @@ -101,7 +103,10 @@ pub fn create_requests( info_hashes: request_info_hashes, }; - requests.push((request, SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 1)))); + requests.push(( + request, + SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 1)), + )); } requests From e2be31c7debb28b103f65d25fa2c53d49c9920bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 18 Oct 2021 01:27:16 +0200 Subject: [PATCH 03/15] aquatic_udp: move ConnectionKey and ConnectionMap to network.rs --- aquatic_udp/src/lib/common.rs | 19 +------------------ aquatic_udp/src/lib/network.rs | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/aquatic_udp/src/lib/common.rs b/aquatic_udp/src/lib/common.rs index 3f48017..c89b94f 100644 --- a/aquatic_udp/src/lib/common.rs +++ b/aquatic_udp/src/lib/common.rs @@ -1,5 +1,5 @@ use std::hash::Hash; -use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::sync::{atomic::AtomicUsize, Arc}; use std::time::Instant; @@ -49,23 +49,6 @@ impl Into for ConnectedResponse { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct ConnectionKey { - pub connection_id: ConnectionId, - pub socket_addr: SocketAddr, -} - -impl ConnectionKey { - pub fn new(connection_id: ConnectionId, socket_addr: SocketAddr) -> Self { - Self { - connection_id, - socket_addr, - } - } -} - -pub type ConnectionMap = HashMap; - #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] pub enum PeerStatus { Seeding, diff --git a/aquatic_udp/src/lib/network.rs b/aquatic_udp/src/lib/network.rs index c8cd776..82d1117 100644 --- a/aquatic_udp/src/lib/network.rs +++ b/aquatic_udp/src/lib/network.rs @@ -8,6 +8,7 @@ use std::time::{Duration, Instant}; use std::vec::Drain; use crossbeam_channel::{Receiver, Sender}; +use hashbrown::HashMap; use mio::net::UdpSocket; use mio::{Events, Interest, Poll, Token}; use rand::prelude::{Rng, SeedableRng, StdRng}; @@ -18,6 +19,23 @@ use aquatic_udp_protocol::{IpVersion, Request, Response}; use crate::common::*; use crate::config::Config; +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct ConnectionKey { + pub connection_id: ConnectionId, + pub socket_addr: SocketAddr, +} + +impl ConnectionKey { + pub fn new(connection_id: ConnectionId, socket_addr: SocketAddr) -> Self { + Self { + connection_id, + socket_addr, + } + } +} + +pub type ConnectionMap = HashMap; + pub fn run_socket_worker( state: State, config: Config, From f64e2e726b13acfcf1f4b5b711b051844c5b2c3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 18 Oct 2021 01:35:49 +0200 Subject: [PATCH 04/15] aquatic_udp: drop MutexGuard quicker; clean up --- aquatic_udp/src/lib/handlers/mod.rs | 34 ++++++++++++----------------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/aquatic_udp/src/lib/handlers/mod.rs b/aquatic_udp/src/lib/handlers/mod.rs index a7597e0..0634702 100644 --- a/aquatic_udp/src/lib/handlers/mod.rs +++ b/aquatic_udp/src/lib/handlers/mod.rs @@ -2,11 +2,7 @@ use std::net::SocketAddr; use std::time::Duration; use crossbeam_channel::{Receiver, Sender}; -use parking_lot::MutexGuard; -use rand::{ - rngs::{SmallRng, StdRng}, - SeedableRng, -}; +use rand::{rngs::SmallRng, SeedableRng}; use aquatic_udp_protocol::*; @@ -27,11 +23,9 @@ pub fn run_request_worker( ) { let mut announce_requests: Vec<(AnnounceRequest, SocketAddr)> = Vec::new(); let mut scrape_requests: Vec<(ScrapeRequest, SocketAddr)> = Vec::new(); - let mut responses: Vec<(ConnectedResponse, SocketAddr)> = Vec::new(); - let mut std_rng = StdRng::from_entropy(); - let mut small_rng = SmallRng::from_rng(&mut std_rng).unwrap(); + let mut small_rng = SmallRng::from_entropy(); let timeout = Duration::from_micros(config.handlers.channel_recv_timeout_microseconds); @@ -70,20 +64,20 @@ pub fn run_request_worker( } } - let mut torrents: MutexGuard = - opt_torrents.unwrap_or_else(|| state.torrents.lock()); + // Generate responses for announce and scrape requests, then drop MutexGuard. + { + let mut torrents = opt_torrents.unwrap_or_else(|| state.torrents.lock()); - // Generate responses for announce and scrape requests + handle_announce_requests( + &config, + &mut torrents, + &mut small_rng, + announce_requests.drain(..), + &mut responses, + ); - handle_announce_requests( - &config, - &mut torrents, - &mut small_rng, - announce_requests.drain(..), - &mut responses, - ); - - handle_scrape_requests(&mut torrents, scrape_requests.drain(..), &mut responses); + handle_scrape_requests(&mut torrents, scrape_requests.drain(..), &mut responses); + } for r in responses.drain(..) { if let Err(err) = response_sender.send(r) { From bfab6bb48f5df24427a3b7302528d5cefd863168 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 18 Oct 2021 01:39:20 +0200 Subject: [PATCH 05/15] Update TODO --- TODO.md | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/TODO.md b/TODO.md index 45e45d5..dc6c3e0 100644 --- a/TODO.md +++ b/TODO.md @@ -1,11 +1,10 @@ # TODO +* aquatic_udp: clean ConnectionMap less often + * access lists: * use arc-swap Cache - * test functionality - * aquatic_udp - * aquatic_http - * aquatic_ws, including sending back new error responses + * add CI tests * aquatic_ws: should it send back error on message parse error, or does that just indicate that not enough data has been received yet? From f0a15e9b6f704970ee5c9f12dd4704de5e1b37ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 18 Oct 2021 02:10:39 +0200 Subject: [PATCH 06/15] aquatic_udp: improve request parse errors, send less error responses --- Cargo.lock | 1 + aquatic_udp/src/lib/network.rs | 21 ++--- aquatic_udp_protocol/Cargo.toml | 1 + aquatic_udp_protocol/src/request.rs | 122 +++++++++++++++------------- 4 files changed, 78 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f1dcab0..c8f0e2c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -216,6 +216,7 @@ name = "aquatic_udp_protocol" version = "0.1.0" dependencies = [ "byteorder", + "either", "quickcheck", "quickcheck_macros", ] diff --git a/aquatic_udp/src/lib/network.rs b/aquatic_udp/src/lib/network.rs index 82d1117..0ce505d 100644 --- a/aquatic_udp/src/lib/network.rs +++ b/aquatic_udp/src/lib/network.rs @@ -216,21 +216,18 @@ fn read_requests( } } Err(err) => { - ::log::debug!("request_from_bytes error: {:?}", err); + ::log::debug!("Request::from_bytes error: {:?}", err); - if let Some(transaction_id) = err.transaction_id { - let opt_message = if err.error.is_some() { - Some("Parse error".into()) - } else if let Some(message) = err.message { - Some(message.into()) - } else { - None - }; - - if let Some(message) = opt_message { + if let RequestParseError::Sendable { + connection_id, + transaction_id, + err, + } = err + { + if connections.contains_key(&ConnectionKey::new(connection_id, src)) { let response = ErrorResponse { transaction_id, - message, + message: err.right_or("Parse error").into(), }; local_responses.push((response.into(), src)); diff --git a/aquatic_udp_protocol/Cargo.toml b/aquatic_udp_protocol/Cargo.toml index 12d11ce..3cf3f43 100644 --- a/aquatic_udp_protocol/Cargo.toml +++ b/aquatic_udp_protocol/Cargo.toml @@ -9,6 +9,7 @@ repository = "https://github.com/greatest-ape/aquatic" [dependencies] byteorder = "1" +either = "1" [dev-dependencies] quickcheck = "1.0" diff --git a/aquatic_udp_protocol/src/request.rs b/aquatic_udp_protocol/src/request.rs index d88ad8d..8d74196 100644 --- a/aquatic_udp_protocol/src/request.rs +++ b/aquatic_udp_protocol/src/request.rs @@ -3,6 +3,7 @@ use std::io::{self, Cursor, Read, Write}; use std::net::Ipv4Addr; use byteorder::{NetworkEndian, ReadBytesExt, WriteBytesExt}; +use either::Either; use super::common::*; @@ -67,32 +68,40 @@ pub struct ScrapeRequest { } #[derive(Debug)] -pub struct RequestParseError { - pub transaction_id: Option, - pub message: Option, - pub error: Option, +pub enum RequestParseError { + Sendable { + connection_id: ConnectionId, + transaction_id: TransactionId, + err: Either, + }, + Unsendable { + err: Either, + }, } impl RequestParseError { - pub fn new(err: io::Error, transaction_id: i32) -> Self { - Self { - transaction_id: Some(TransactionId(transaction_id)), - message: None, - error: Some(err), + pub fn sendable_io(err: io::Error, connection_id: i64, transaction_id: i32) -> Self { + Self::Sendable { + connection_id: ConnectionId(connection_id), + transaction_id: TransactionId(transaction_id), + err: Either::Left(err), } } - pub fn io(err: io::Error) -> Self { - Self { - transaction_id: None, - message: None, - error: Some(err), + pub fn sendable_text(text: &'static str, connection_id: i64, transaction_id: i32) -> Self { + Self::Sendable { + connection_id: ConnectionId(connection_id), + transaction_id: TransactionId(transaction_id), + err: Either::Right(text), } } - pub fn text(transaction_id: i32, message: &str) -> Self { - Self { - transaction_id: Some(TransactionId(transaction_id)), - message: Some(message.to_string()), - error: None, + pub fn unsendable_io(err: io::Error) -> Self { + Self::Unsendable { + err: Either::Left(err), + } + } + pub fn unsendable_text(text: &'static str) -> Self { + Self::Unsendable { + err: Either::Right(text), } } } @@ -171,13 +180,13 @@ impl Request { let connection_id = cursor .read_i64::() - .map_err(RequestParseError::io)?; + .map_err(RequestParseError::unsendable_io)?; let action = cursor .read_i32::() - .map_err(RequestParseError::io)?; + .map_err(RequestParseError::unsendable_io)?; let transaction_id = cursor .read_i32::() - .map_err(RequestParseError::io)?; + .map_err(RequestParseError::unsendable_io)?; match action { // Connect @@ -188,8 +197,7 @@ impl Request { }) .into()) } else { - Err(RequestParseError::text( - transaction_id, + Err(RequestParseError::unsendable_text( "Protocol identifier missing", )) } @@ -201,39 +209,39 @@ impl Request { let mut peer_id = [0; 20]; let mut ip = [0; 4]; - cursor - .read_exact(&mut info_hash) - .map_err(|err| RequestParseError::new(err, transaction_id))?; - cursor - .read_exact(&mut peer_id) - .map_err(|err| RequestParseError::new(err, transaction_id))?; + cursor.read_exact(&mut info_hash).map_err(|err| { + RequestParseError::sendable_io(err, connection_id, transaction_id) + })?; + cursor.read_exact(&mut peer_id).map_err(|err| { + RequestParseError::sendable_io(err, connection_id, transaction_id) + })?; - let bytes_downloaded = cursor - .read_i64::() - .map_err(|err| RequestParseError::new(err, transaction_id))?; - let bytes_left = cursor - .read_i64::() - .map_err(|err| RequestParseError::new(err, transaction_id))?; - let bytes_uploaded = cursor - .read_i64::() - .map_err(|err| RequestParseError::new(err, transaction_id))?; - let event = cursor - .read_i32::() - .map_err(|err| RequestParseError::new(err, transaction_id))?; + let bytes_downloaded = cursor.read_i64::().map_err(|err| { + RequestParseError::sendable_io(err, connection_id, transaction_id) + })?; + let bytes_left = cursor.read_i64::().map_err(|err| { + RequestParseError::sendable_io(err, connection_id, transaction_id) + })?; + let bytes_uploaded = cursor.read_i64::().map_err(|err| { + RequestParseError::sendable_io(err, connection_id, transaction_id) + })?; + let event = cursor.read_i32::().map_err(|err| { + RequestParseError::sendable_io(err, connection_id, transaction_id) + })?; - cursor - .read_exact(&mut ip) - .map_err(|err| RequestParseError::new(err, transaction_id))?; + cursor.read_exact(&mut ip).map_err(|err| { + RequestParseError::sendable_io(err, connection_id, transaction_id) + })?; - let key = cursor - .read_u32::() - .map_err(|err| RequestParseError::new(err, transaction_id))?; - let peers_wanted = cursor - .read_i32::() - .map_err(|err| RequestParseError::new(err, transaction_id))?; - let port = cursor - .read_u16::() - .map_err(|err| RequestParseError::new(err, transaction_id))?; + let key = cursor.read_u32::().map_err(|err| { + RequestParseError::sendable_io(err, connection_id, transaction_id) + })?; + let peers_wanted = cursor.read_i32::().map_err(|err| { + RequestParseError::sendable_io(err, connection_id, transaction_id) + })?; + let port = cursor.read_u16::().map_err(|err| { + RequestParseError::sendable_io(err, connection_id, transaction_id) + })?; let opt_ip = if ip == [0; 4] { None @@ -277,7 +285,11 @@ impl Request { .into()) } - _ => Err(RequestParseError::text(transaction_id, "Invalid action")), + _ => Err(RequestParseError::sendable_text( + "Invalid action", + connection_id, + transaction_id, + )), } } } From 6d834e772d3077aa186b13b9d496fd546967edb9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 18 Oct 2021 02:22:40 +0200 Subject: [PATCH 07/15] aquatic_udp: simplify ConnectionMap interface --- aquatic_udp/src/lib/network.rs | 51 +++++++++++++++++----------------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/aquatic_udp/src/lib/network.rs b/aquatic_udp/src/lib/network.rs index 0ce505d..8ab5d2f 100644 --- a/aquatic_udp/src/lib/network.rs +++ b/aquatic_udp/src/lib/network.rs @@ -19,23 +19,31 @@ use aquatic_udp_protocol::{IpVersion, Request, Response}; use crate::common::*; use crate::config::Config; -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct ConnectionKey { - pub connection_id: ConnectionId, - pub socket_addr: SocketAddr, -} +#[derive(Default)] +struct ConnectionMap(HashMap<(ConnectionId, SocketAddr), ValidUntil>); -impl ConnectionKey { - pub fn new(connection_id: ConnectionId, socket_addr: SocketAddr) -> Self { - Self { - connection_id, - socket_addr, - } +impl ConnectionMap { + fn insert( + &mut self, + connection_id: ConnectionId, + socket_addr: SocketAddr, + valid_until: ValidUntil, + ) { + self.0.insert((connection_id, socket_addr), valid_until); + } + + fn contains(&mut self, connection_id: ConnectionId, socket_addr: SocketAddr) -> bool { + self.0.contains_key(&(connection_id, socket_addr)) + } + + fn clean(&mut self) { + let now = Instant::now(); + + self.0.retain(|_, v| v.0 > now); + self.0.shrink_to_fit(); } } -pub type ConnectionMap = HashMap; - pub fn run_socket_worker( state: State, config: Config, @@ -107,10 +115,7 @@ pub fn run_socket_worker( local_responses.drain(..), ); - let now = Instant::now(); - - connections.retain(|_, v| v.0 > now); - connections.shrink_to_fit(); + connections.clean(); } } @@ -180,7 +185,7 @@ fn read_requests( Ok(Request::Connect(request)) => { let connection_id = ConnectionId(rng.gen()); - connections.insert(ConnectionKey::new(connection_id, src), valid_until); + connections.insert(connection_id, src, valid_until); let response = Response::Connect(ConnectResponse { connection_id, @@ -190,9 +195,7 @@ fn read_requests( local_responses.push((response, src)) } Ok(Request::Announce(request)) => { - let key = ConnectionKey::new(request.connection_id, src); - - if connections.contains_key(&key) { + if connections.contains(request.connection_id, src) { if state .access_list .allows(access_list_mode, &request.info_hash.0) @@ -209,9 +212,7 @@ fn read_requests( } } Ok(Request::Scrape(request)) => { - let key = ConnectionKey::new(request.connection_id, src); - - if connections.contains_key(&key) { + if connections.contains(request.connection_id, src) { requests.push((ConnectedRequest::Scrape(request), src)); } } @@ -224,7 +225,7 @@ fn read_requests( err, } = err { - if connections.contains_key(&ConnectionKey::new(connection_id, src)) { + if connections.contains(connection_id, src) { let response = ErrorResponse { transaction_id, message: err.right_or("Parse error").into(), From 1f763e63e452b228534964270e783e10c076d571 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 18 Oct 2021 02:30:49 +0200 Subject: [PATCH 08/15] aquatic_udp: clean ConnectionMap less often --- TODO.md | 2 -- aquatic_udp/src/lib/network.rs | 10 +++++++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/TODO.md b/TODO.md index dc6c3e0..ea65cc2 100644 --- a/TODO.md +++ b/TODO.md @@ -1,7 +1,5 @@ # TODO -* aquatic_udp: clean ConnectionMap less often - * access lists: * use arc-swap Cache * add CI tests diff --git a/aquatic_udp/src/lib/network.rs b/aquatic_udp/src/lib/network.rs index 8ab5d2f..82b187f 100644 --- a/aquatic_udp/src/lib/network.rs +++ b/aquatic_udp/src/lib/network.rs @@ -74,6 +74,8 @@ pub fn run_socket_worker( let timeout = Duration::from_millis(50); + let mut iter_counter = 0usize; + loop { poll.poll(&mut events, Some(timeout)) .expect("failed polling"); @@ -115,7 +117,13 @@ pub fn run_socket_worker( local_responses.drain(..), ); - connections.clean(); + iter_counter += 1; + + if iter_counter == 1000 { + connections.clean(); + + iter_counter = 0; + } } } From 7187190cec49bba9b7b629c378ef90b702f2b1e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 18 Oct 2021 02:38:03 +0200 Subject: [PATCH 09/15] aquatic_udp: don't panic on Response::write error --- aquatic_udp/src/lib/network.rs | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/aquatic_udp/src/lib/network.rs b/aquatic_udp/src/lib/network.rs index 82b187f..8901da5 100644 --- a/aquatic_udp/src/lib/network.rs +++ b/aquatic_udp/src/lib/network.rs @@ -292,21 +292,26 @@ fn send_responses( let ip_version = ip_version_from_ip(src.ip()); - response.write(&mut cursor, ip_version).unwrap(); + match response.write(&mut cursor, ip_version) { + Ok(()) => { + let amt = cursor.position() as usize; - let amt = cursor.position() as usize; + match socket.send_to(&cursor.get_ref()[..amt], src) { + Ok(amt) => { + responses_sent += 1; + bytes_sent += amt; + } + Err(err) => { + if err.kind() == ErrorKind::WouldBlock { + break; + } - match socket.send_to(&cursor.get_ref()[..amt], src) { - Ok(amt) => { - responses_sent += 1; - bytes_sent += amt; - } - Err(err) => { - if err.kind() == ErrorKind::WouldBlock { - break; + ::log::info!("send_to error: {}", err); + } } - - ::log::info!("send_to error: {}", err); + }, + Err(err) => { + ::log::error!("Response::write error: {:?}", err); } } } From 253497bb740b55ac246920a4563ba676203ed623 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 18 Oct 2021 02:46:45 +0200 Subject: [PATCH 10/15] aquatic_udp: update configuration documentation --- aquatic_udp/src/lib/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aquatic_udp/src/lib/config.rs b/aquatic_udp/src/lib/config.rs index 5ce9685..981095e 100644 --- a/aquatic_udp/src/lib/config.rs +++ b/aquatic_udp/src/lib/config.rs @@ -84,7 +84,7 @@ pub struct StatisticsConfig { #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct CleaningConfig { - /// Clean torrents and connections this often (seconds) + /// Update access list and clean torrents this often (seconds) pub interval: u64, /// Remove peers that haven't announced for this long (seconds) pub max_peer_age: u64, From 6b8616acf9eb41f2cea650374c32e6dc08372483 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 18 Oct 2021 02:46:58 +0200 Subject: [PATCH 11/15] aquatic_udp: pass less of state to tasks::update_access_list --- aquatic_udp/src/lib/lib.rs | 4 ++-- aquatic_udp/src/lib/tasks.rs | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/aquatic_udp/src/lib/lib.rs b/aquatic_udp/src/lib/lib.rs index 7ccee1c..a740a3e 100644 --- a/aquatic_udp/src/lib/lib.rs +++ b/aquatic_udp/src/lib/lib.rs @@ -23,7 +23,7 @@ pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker"; pub fn run(config: Config) -> ::anyhow::Result<()> { let state = State::default(); - tasks::update_access_list(&config, &state); + tasks::update_access_list(&config, &state.access_list); let num_bound_sockets = start_workers(config.clone(), state.clone())?; @@ -55,7 +55,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { loop { ::std::thread::sleep(Duration::from_secs(config.cleaning.interval)); - tasks::update_access_list(&config, &state); + tasks::update_access_list(&config, &state.access_list); state.torrents.lock().clean(&config, &state.access_list); } diff --git a/aquatic_udp/src/lib/tasks.rs b/aquatic_udp/src/lib/tasks.rs index 44675c9..13517c4 100644 --- a/aquatic_udp/src/lib/tasks.rs +++ b/aquatic_udp/src/lib/tasks.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use std::sync::atomic::Ordering; use histogram::Histogram; @@ -7,10 +8,10 @@ use aquatic_common::access_list::AccessListMode; use crate::common::*; use crate::config::Config; -pub fn update_access_list(config: &Config, state: &State) { +pub fn update_access_list(config: &Config, access_list: &Arc) { match config.access_list.mode { AccessListMode::White | AccessListMode::Black => { - if let Err(err) = state.access_list.update_from_path(&config.access_list.path) { + if let Err(err) = access_list.update_from_path(&config.access_list.path) { ::log::error!("Update access list from path: {:?}", err); } } From 07a453b6b30d5d6ff241c7ba10cfd41effc950e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 18 Oct 2021 11:06:07 +0200 Subject: [PATCH 12/15] aquatic_udp: in network request reading, send on to channel directly --- aquatic_udp/src/lib/network.rs | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/aquatic_udp/src/lib/network.rs b/aquatic_udp/src/lib/network.rs index 8901da5..3daad2e 100644 --- a/aquatic_udp/src/lib/network.rs +++ b/aquatic_udp/src/lib/network.rs @@ -69,7 +69,6 @@ pub fn run_socket_worker( let mut events = Events::with_capacity(config.network.poll_event_capacity); let mut connections = ConnectionMap::default(); - let mut requests: Vec<(ConnectedRequest, SocketAddr)> = Vec::new(); let mut local_responses: Vec<(Response, SocketAddr)> = Vec::new(); let timeout = Duration::from_millis(50); @@ -91,16 +90,10 @@ pub fn run_socket_worker( &mut rng, &mut socket, &mut buffer, - &mut requests, + &request_sender, &mut local_responses, ); - for r in requests.drain(..) { - if let Err(err) = request_sender.send(r) { - ::log::error!("error sending to request_sender: {}", err); - } - } - state .statistics .readable_events @@ -168,7 +161,7 @@ fn read_requests( rng: &mut StdRng, socket: &mut UdpSocket, buffer: &mut [u8], - requests: &mut Vec<(ConnectedRequest, SocketAddr)>, + request_sender: &Sender<(ConnectedRequest, SocketAddr)>, local_responses: &mut Vec<(Response, SocketAddr)>, ) { let mut requests_received: usize = 0; @@ -208,7 +201,11 @@ fn read_requests( .access_list .allows(access_list_mode, &request.info_hash.0) { - requests.push((ConnectedRequest::Announce(request), src)); + if let Err(err) = + request_sender.send((ConnectedRequest::Announce(request), src)) + { + ::log::warn!("request_sender.send failed: {:?}", err) + } } else { let response = Response::Error(ErrorResponse { transaction_id: request.transaction_id, @@ -221,7 +218,11 @@ fn read_requests( } Ok(Request::Scrape(request)) => { if connections.contains(request.connection_id, src) { - requests.push((ConnectedRequest::Scrape(request), src)); + if let Err(err) = + request_sender.send((ConnectedRequest::Scrape(request), src)) + { + ::log::warn!("request_sender.send failed: {:?}", err) + } } } Err(err) => { @@ -309,7 +310,7 @@ fn send_responses( ::log::info!("send_to error: {}", err); } } - }, + } Err(err) => { ::log::error!("Response::write error: {:?}", err); } From 90d60108c127654faeae8b400f7c95e2c75e7b92 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 18 Oct 2021 11:10:56 +0200 Subject: [PATCH 13/15] aquatic_udp: innetwork, use request_sender.try_send; run rustfmt --- aquatic_udp/src/lib/network.rs | 10 +++++----- aquatic_udp/src/lib/tasks.rs | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/aquatic_udp/src/lib/network.rs b/aquatic_udp/src/lib/network.rs index 3daad2e..b2403e2 100644 --- a/aquatic_udp/src/lib/network.rs +++ b/aquatic_udp/src/lib/network.rs @@ -201,10 +201,10 @@ fn read_requests( .access_list .allows(access_list_mode, &request.info_hash.0) { - if let Err(err) = - request_sender.send((ConnectedRequest::Announce(request), src)) + if let Err(err) = request_sender + .try_send((ConnectedRequest::Announce(request), src)) { - ::log::warn!("request_sender.send failed: {:?}", err) + ::log::warn!("request_sender.try_send failed: {:?}", err) } } else { let response = Response::Error(ErrorResponse { @@ -219,9 +219,9 @@ fn read_requests( Ok(Request::Scrape(request)) => { if connections.contains(request.connection_id, src) { if let Err(err) = - request_sender.send((ConnectedRequest::Scrape(request), src)) + request_sender.try_send((ConnectedRequest::Scrape(request), src)) { - ::log::warn!("request_sender.send failed: {:?}", err) + ::log::warn!("request_sender.try_send failed: {:?}", err) } } } diff --git a/aquatic_udp/src/lib/tasks.rs b/aquatic_udp/src/lib/tasks.rs index 13517c4..54a0ce8 100644 --- a/aquatic_udp/src/lib/tasks.rs +++ b/aquatic_udp/src/lib/tasks.rs @@ -1,5 +1,5 @@ -use std::sync::Arc; use std::sync::atomic::Ordering; +use std::sync::Arc; use histogram::Histogram; From 9aa783fbb12cc8ef7f6a16cfd62e60b79a9983b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 18 Oct 2021 11:15:59 +0200 Subject: [PATCH 14/15] aquatic_udp: stop counting readable events for statistics It is not very informative and might be expensive --- aquatic_udp/src/lib/common.rs | 1 - aquatic_udp/src/lib/network.rs | 5 ----- aquatic_udp/src/lib/tasks.rs | 14 ++------------ 3 files changed, 2 insertions(+), 18 deletions(-) diff --git a/aquatic_udp/src/lib/common.rs b/aquatic_udp/src/lib/common.rs index c89b94f..d733c52 100644 --- a/aquatic_udp/src/lib/common.rs +++ b/aquatic_udp/src/lib/common.rs @@ -174,7 +174,6 @@ impl TorrentMaps { pub struct Statistics { pub requests_received: AtomicUsize, pub responses_sent: AtomicUsize, - pub readable_events: AtomicUsize, pub bytes_received: AtomicUsize, pub bytes_sent: AtomicUsize, } diff --git a/aquatic_udp/src/lib/network.rs b/aquatic_udp/src/lib/network.rs index b2403e2..da6bd2a 100644 --- a/aquatic_udp/src/lib/network.rs +++ b/aquatic_udp/src/lib/network.rs @@ -93,11 +93,6 @@ pub fn run_socket_worker( &request_sender, &mut local_responses, ); - - state - .statistics - .readable_events - .fetch_add(1, Ordering::SeqCst); } } diff --git a/aquatic_udp/src/lib/tasks.rs b/aquatic_udp/src/lib/tasks.rs index 54a0ce8..2665c45 100644 --- a/aquatic_udp/src/lib/tasks.rs +++ b/aquatic_udp/src/lib/tasks.rs @@ -41,19 +41,9 @@ pub fn gather_and_print_statistics(state: &State, config: &Config) { let bytes_received_per_second: f64 = bytes_received / interval as f64; let bytes_sent_per_second: f64 = bytes_sent / interval as f64; - let readable_events: f64 = state - .statistics - .readable_events - .fetch_and(0, Ordering::SeqCst) as f64; - let requests_per_readable_event = if readable_events == 0.0 { - 0.0 - } else { - requests_received / readable_events - }; - println!( - "stats: {:.2} requests/second, {:.2} responses/second, {:.2} requests/readable event", - requests_per_second, responses_per_second, requests_per_readable_event + "stats: {:.2} requests/second, {:.2} responses/second", + requests_per_second, responses_per_second ); println!( From fc0c2b7c1c0433d9e607225a8a19c40a2fa418bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 18 Oct 2021 11:35:18 +0200 Subject: [PATCH 15/15] aquatic_udp_bench: update example outputs in top-level comment --- aquatic_udp_bench/src/main.rs | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/aquatic_udp_bench/src/main.rs b/aquatic_udp_bench/src/main.rs index c0313df..c89de8b 100644 --- a/aquatic_udp_bench/src/main.rs +++ b/aquatic_udp_bench/src/main.rs @@ -2,16 +2,9 @@ //! //! Example outputs: //! ``` -//! # Results over 20 rounds with 1 threads -//! Connect: 2 306 637 requests/second, 433.53 ns/request -//! Announce: 688 391 requests/second, 1452.66 ns/request -//! Scrape: 1 505 700 requests/second, 664.14 ns/request -//! ``` -//! ``` -//! # Results over 20 rounds with 2 threads -//! Connect: 3 472 434 requests/second, 287.98 ns/request -//! Announce: 739 371 requests/second, 1352.50 ns/request -//! Scrape: 1 845 253 requests/second, 541.93 ns/request +//! # Results over 10 rounds with 2 threads +//! Announce: 429 540 requests/second, 2328.07 ns/request +//! Scrape: 1 873 545 requests/second, 533.75 ns/request //! ``` use crossbeam_channel::unbounded;