diff --git a/aquatic_udp/src/lib/common/mod.rs b/aquatic_udp/src/lib/common/mod.rs index a09d225..72f9862 100644 --- a/aquatic_udp/src/lib/common/mod.rs +++ b/aquatic_udp/src/lib/common/mod.rs @@ -1,9 +1,11 @@ +use std::collections::BTreeMap; use std::hash::Hash; -use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::sync::atomic::AtomicUsize; use std::sync::Arc; use std::time::Instant; +use crossbeam_channel::Sender; use parking_lot::Mutex; use socket2::{Domain, Protocol, Socket, Type}; @@ -34,35 +36,29 @@ impl Ip for Ipv6Addr { } } +#[derive(Debug)] +pub struct PendingScrapeRequest { + pub transaction_id: TransactionId, + pub info_hashes: BTreeMap, +} + +#[derive(Debug)] +pub struct PendingScrapeResponse { + pub transaction_id: TransactionId, + pub torrent_stats: BTreeMap, +} + #[derive(Debug)] pub enum ConnectedRequest { Announce(AnnounceRequest), - Scrape { - request: ScrapeRequest, - /// Currently only used by glommio implementation - original_indices: Vec, - }, + Scrape(PendingScrapeRequest), } #[derive(Debug)] pub enum ConnectedResponse { AnnounceIpv4(AnnounceResponseIpv4), AnnounceIpv6(AnnounceResponseIpv6), - Scrape { - response: ScrapeResponse, - /// Currently only used by glommio implementation - original_indices: Vec, - }, -} - -impl Into for ConnectedResponse { - fn into(self) -> Response { - match self { - Self::AnnounceIpv4(response) => Response::AnnounceIpv4(response), - Self::AnnounceIpv6(response) => Response::AnnounceIpv6(response), - Self::Scrape { response, .. } => Response::Scrape(response), - } - } + Scrape(PendingScrapeResponse), } #[derive(Clone, PartialEq, Debug)] @@ -117,6 +113,64 @@ impl Into for ProtocolAnnounceResponse { } } +#[derive(Clone, Copy, Debug)] +pub struct SocketWorkerIndex(pub usize); + +#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] +pub struct RequestWorkerIndex(pub usize); + +impl RequestWorkerIndex { + fn from_info_hash(config: &Config, info_hash: InfoHash) -> Self { + Self(info_hash.0[0] as usize % config.request_workers) + } +} + +pub struct ConnectedRequestSender { + index: SocketWorkerIndex, + senders: Vec>, +} + +impl ConnectedRequestSender { + pub fn new( + index: SocketWorkerIndex, + senders: Vec>, + ) -> Self { + Self { index, senders } + } + + pub fn try_send_to( + &self, + index: RequestWorkerIndex, + request: ConnectedRequest, + addr: SocketAddr, + ) { + if let Err(err) = self.senders[index.0].try_send((self.index, request, addr)) { + ::log::warn!("request_sender.try_send failed: {:?}", err) + } + } +} + +pub struct ConnectedResponseSender { + senders: Vec>, +} + +impl ConnectedResponseSender { + pub fn new(senders: Vec>) -> Self { + Self { senders } + } + + pub fn try_send_to( + &self, + index: SocketWorkerIndex, + response: ConnectedResponse, + addr: SocketAddr, + ) { + if let Err(err) = self.senders[index.0].try_send((response, addr)) { + ::log::warn!("request_sender.try_send failed: {:?}", err) + } + } +} + #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] pub enum PeerStatus { Seeding, diff --git a/aquatic_udp/src/lib/common/network.rs b/aquatic_udp/src/lib/common/network.rs index 053a7da..ee3ca4f 100644 --- a/aquatic_udp/src/lib/common/network.rs +++ b/aquatic_udp/src/lib/common/network.rs @@ -4,7 +4,6 @@ use aquatic_common::access_list::AccessListCache; use aquatic_common::AHashIndexMap; use aquatic_common::ValidUntil; use aquatic_udp_protocol::*; -use crossbeam_channel::Sender; use rand::{prelude::StdRng, Rng}; use crate::common::*; @@ -34,12 +33,75 @@ impl ConnectionMap { } } +pub struct PendingScrapeResponseMeta { + num_pending: usize, + valid_until: ValidUntil, +} + +#[derive(Default)] +pub struct PendingScrapeResponseMap( + AHashIndexMap, +); + +impl PendingScrapeResponseMap { + pub fn prepare( + &mut self, + transaction_id: TransactionId, + num_pending: usize, + valid_until: ValidUntil, + ) { + let meta = PendingScrapeResponseMeta { + num_pending, + valid_until, + }; + let response = PendingScrapeResponse { + transaction_id, + torrent_stats: BTreeMap::new(), + }; + + self.0.insert(transaction_id, (meta, response)); + } + + pub fn add_and_get_finished(&mut self, response: PendingScrapeResponse) -> Option { + let finished = if let Some(r) = self.0.get_mut(&response.transaction_id) { + r.0.num_pending -= 1; + + r.1.torrent_stats.extend(response.torrent_stats.into_iter()); + + r.0.num_pending == 0 + } else { + ::log::warn!("PendingScrapeResponses.add didn't find PendingScrapeResponse in map"); + + false + }; + + if finished { + let response = self.0.remove(&response.transaction_id).unwrap().1; + + Some(Response::Scrape(ScrapeResponse { + transaction_id: response.transaction_id, + torrent_stats: response.torrent_stats.into_values().collect(), + })) + } else { + None + } + } + + pub fn clean(&mut self) { + let now = Instant::now(); + + self.0.retain(|_, v| v.0.valid_until.0 > now); + self.0.shrink_to_fit(); + } +} + pub fn handle_request( config: &Config, connections: &mut ConnectionMap, + pending_scrape_responses: &mut PendingScrapeResponseMap, access_list_cache: &mut AccessListCache, rng: &mut StdRng, - request_sender: &Sender<(ConnectedRequest, SocketAddr)>, + request_sender: &ConnectedRequestSender, local_responses: &mut Vec<(Response, SocketAddr)>, valid_until: ValidUntil, res_request: Result, @@ -66,11 +128,14 @@ pub fn handle_request( .load() .allows(access_list_mode, &request.info_hash.0) { - if let Err(err) = - request_sender.try_send((ConnectedRequest::Announce(request), src)) - { - ::log::warn!("request_sender.try_send failed: {:?}", err) - } + let worker_index = + RequestWorkerIndex::from_info_hash(config, request.info_hash); + + request_sender.try_send_to( + worker_index, + ConnectedRequest::Announce(request), + src, + ); } else { let response = Response::Error(ErrorResponse { transaction_id: request.transaction_id, @@ -83,13 +148,30 @@ pub fn handle_request( } Ok(Request::Scrape(request)) => { if connections.contains(request.connection_id, src) { - let request = ConnectedRequest::Scrape { - request, - original_indices: Vec::new(), - }; + let mut requests: AHashIndexMap = + Default::default(); - if let Err(err) = request_sender.try_send((request, src)) { - ::log::warn!("request_sender.try_send failed: {:?}", err) + let transaction_id = request.transaction_id; + + for (i, info_hash) in request.info_hashes.into_iter().enumerate() { + let pending = requests + .entry(RequestWorkerIndex::from_info_hash(&config, info_hash)) + .or_insert_with(|| PendingScrapeRequest { + transaction_id, + info_hashes: BTreeMap::new(), + }); + + pending.info_hashes.insert(i, info_hash); + } + + pending_scrape_responses.prepare(transaction_id, requests.len(), valid_until); + + for (request_worker_index, request) in requests { + request_sender.try_send_to( + request_worker_index, + ConnectedRequest::Scrape(request), + src, + ); } } } diff --git a/aquatic_udp/src/lib/handlers.rs b/aquatic_udp/src/lib/handlers.rs index d23e7ab..44c934d 100644 --- a/aquatic_udp/src/lib/handlers.rs +++ b/aquatic_udp/src/lib/handlers.rs @@ -1,12 +1,12 @@ +use std::collections::BTreeMap; use std::net::IpAddr; use std::net::SocketAddr; use std::time::Duration; use aquatic_common::ValidUntil; -use crossbeam_channel::{Receiver, Sender}; +use crossbeam_channel::Receiver; use rand::{rngs::SmallRng, SeedableRng}; -use aquatic_common::convert_ipv4_mapped_ipv6; use aquatic_common::extract_response_peers; use aquatic_udp_protocol::*; @@ -15,88 +15,37 @@ use crate::common::*; use crate::config::Config; pub fn run_request_worker( - state: State, config: Config, - request_receiver: Receiver<(ConnectedRequest, SocketAddr)>, - response_sender: Sender<(ConnectedResponse, SocketAddr)>, + request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, SocketAddr)>, + response_sender: ConnectedResponseSender, ) { - let mut announce_requests: Vec<(AnnounceRequest, SocketAddr)> = Vec::new(); - let mut scrape_requests: Vec<(ScrapeRequest, SocketAddr)> = Vec::new(); - let mut responses: Vec<(ConnectedResponse, SocketAddr)> = Vec::new(); - + let mut torrents = TorrentMaps::default(); let mut small_rng = SmallRng::from_entropy(); let timeout = Duration::from_micros(config.handlers.channel_recv_timeout_microseconds); loop { - let mut opt_torrents = None; - - // Collect requests from channel, divide them by type - // - // Collect a maximum number of request. Stop collecting before that - // number is reached if having waited for too long for a request, but - // only if TorrentMaps mutex isn't locked. - for i in 0..config.handlers.max_requests_per_iter { - let (request, src): (ConnectedRequest, SocketAddr) = if i == 0 { - match request_receiver.recv() { - Ok(r) => r, - Err(_) => break, // Really shouldn't happen - } - } else { - match request_receiver.recv_timeout(timeout) { - Ok(r) => r, - Err(_) => { - if let Some(guard) = state.torrents.try_lock() { - opt_torrents = Some(guard); - - break; - } else { - continue; - } - } - } - }; - - match request { - ConnectedRequest::Announce(request) => announce_requests.push((request, src)), - ConnectedRequest::Scrape { request, .. } => scrape_requests.push((request, src)), - } - } - - // Generate responses for announce and scrape requests, then drop MutexGuard. - { - let mut torrents = opt_torrents.unwrap_or_else(|| state.torrents.lock()); - + if let Ok((sender_index, request, src)) = request_receiver.recv_timeout(timeout) { let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); - responses.extend(announce_requests.drain(..).map(|(request, src)| { - let response = handle_announce_request( + let response = match request { + ConnectedRequest::Announce(request) => handle_announce_request( &config, &mut small_rng, &mut torrents, request, src, peer_valid_until, - ); + ), + ConnectedRequest::Scrape(request) => { + ConnectedResponse::Scrape(handle_scrape_request(&mut torrents, src, request)) + } + }; - (response, src) - })); - - responses.extend(scrape_requests.drain(..).map(|(request, src)| { - let response = ConnectedResponse::Scrape { - response: handle_scrape_request(&mut torrents, src, request), - original_indices: Vec::new(), - }; - - (response, src) - })); + response_sender.try_send_to(sender_index, response, src); } - for r in responses.drain(..) { - if let Err(err) = response_sender.send(r) { - ::log::error!("error sending response to channel: {}", err); - } - } + // TODO: clean torrent map, update peer_valid_until } } @@ -207,41 +156,43 @@ fn calc_max_num_peers_to_take(config: &Config, peers_wanted: i32) -> usize { pub fn handle_scrape_request( torrents: &mut TorrentMaps, src: SocketAddr, - request: ScrapeRequest, -) -> ScrapeResponse { + request: PendingScrapeRequest, +) -> PendingScrapeResponse { const EMPTY_STATS: TorrentScrapeStatistics = create_torrent_scrape_statistics(0, 0); - let mut stats: Vec = Vec::with_capacity(request.info_hashes.len()); + let mut torrent_stats: BTreeMap = BTreeMap::new(); - let peer_ip = convert_ipv4_mapped_ipv6(src.ip()); - - if peer_ip.is_ipv4() { - for info_hash in request.info_hashes.iter() { - if let Some(torrent_data) = torrents.ipv4.get(info_hash) { - stats.push(create_torrent_scrape_statistics( + if src.ip().is_ipv4() { + torrent_stats.extend(request.info_hashes.into_iter().map(|(i, info_hash)| { + let s = if let Some(torrent_data) = torrents.ipv4.get(&info_hash) { + create_torrent_scrape_statistics( torrent_data.num_seeders as i32, torrent_data.num_leechers as i32, - )); + ) } else { - stats.push(EMPTY_STATS); - } - } + EMPTY_STATS + }; + + (i, s) + })); } else { - for info_hash in request.info_hashes.iter() { - if let Some(torrent_data) = torrents.ipv6.get(info_hash) { - stats.push(create_torrent_scrape_statistics( + torrent_stats.extend(request.info_hashes.into_iter().map(|(i, info_hash)| { + let s = if let Some(torrent_data) = torrents.ipv6.get(&info_hash) { + create_torrent_scrape_statistics( torrent_data.num_seeders as i32, torrent_data.num_leechers as i32, - )); + ) } else { - stats.push(EMPTY_STATS); - } - } + EMPTY_STATS + }; + + (i, s) + })); } - ScrapeResponse { + PendingScrapeResponse { transaction_id: request.transaction_id, - torrent_stats: stats, + torrent_stats, } } diff --git a/aquatic_udp/src/lib/lib.rs b/aquatic_udp/src/lib/lib.rs index a08b862..e321e95 100644 --- a/aquatic_udp/src/lib/lib.rs +++ b/aquatic_udp/src/lib/lib.rs @@ -9,6 +9,7 @@ pub mod tasks; use config::Config; +use std::collections::BTreeMap; use std::sync::{atomic::AtomicUsize, Arc}; use std::thread::Builder; use std::time::Duration; @@ -23,7 +24,7 @@ use aquatic_common::access_list::update_access_list; use signal_hook::consts::SIGUSR1; use signal_hook::iterator::Signals; -use common::State; +use common::{ConnectedRequestSender, ConnectedResponseSender, SocketWorkerIndex, State}; pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker"; @@ -63,14 +64,30 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> { let num_bound_sockets = Arc::new(AtomicUsize::new(0)); - let (request_sender, request_receiver) = unbounded(); - let (response_sender, response_receiver) = unbounded(); + let mut request_senders = Vec::new(); + let mut request_receivers = BTreeMap::new(); + + let mut response_senders = Vec::new(); + let mut response_receivers = BTreeMap::new(); + + for i in 0..config.request_workers { + let (request_sender, request_receiver) = unbounded(); + + request_senders.push(request_sender); + request_receivers.insert(i, request_receiver); + } + + for i in 0..config.socket_workers { + let (response_sender, response_receiver) = unbounded(); + + response_senders.push(response_sender); + response_receivers.insert(i, response_receiver); + } for i in 0..config.request_workers { - let state = state.clone(); let config = config.clone(); - let request_receiver = request_receiver.clone(); - let response_sender = response_sender.clone(); + let request_receiver = request_receivers.remove(&i).unwrap().clone(); + let response_sender = ConnectedResponseSender::new(response_senders.clone()); Builder::new() .name(format!("request-{:02}", i + 1)) @@ -82,7 +99,7 @@ pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> { WorkerIndex::RequestWorker(i), ); - handlers::run_request_worker(state, config, request_receiver, response_sender) + handlers::run_request_worker(config, request_receiver, response_sender) }) .with_context(|| "spawn request worker")?; } @@ -90,8 +107,9 @@ pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> { for i in 0..config.socket_workers { let state = state.clone(); let config = config.clone(); - let request_sender = request_sender.clone(); - let response_receiver = response_receiver.clone(); + let request_sender = + ConnectedRequestSender::new(SocketWorkerIndex(i), request_senders.clone()); + let response_receiver = response_receivers.remove(&i).unwrap(); let num_bound_sockets = num_bound_sockets.clone(); Builder::new() @@ -128,6 +146,12 @@ pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> { .with_context(|| "spawn socket worker")?; } + ::std::mem::drop(request_senders); + ::std::mem::drop(request_receivers); + + ::std::mem::drop(response_senders); + ::std::mem::drop(response_receivers); + if config.statistics.interval != 0 { let state = state.clone(); let config = config.clone(); diff --git a/aquatic_udp/src/lib/network_mio.rs b/aquatic_udp/src/lib/network_mio.rs index 35769cf..5e4cac1 100644 --- a/aquatic_udp/src/lib/network_mio.rs +++ b/aquatic_udp/src/lib/network_mio.rs @@ -9,7 +9,7 @@ use std::vec::Drain; use aquatic_common::access_list::create_access_list_cache; use aquatic_common::ValidUntil; -use crossbeam_channel::{Receiver, Sender}; +use crossbeam_channel::Receiver; use mio::net::UdpSocket; use mio::{Events, Interest, Poll, Token}; use rand::prelude::{SeedableRng, StdRng}; @@ -24,7 +24,7 @@ pub fn run_socket_worker( state: State, config: Config, token_num: usize, - request_sender: Sender<(ConnectedRequest, SocketAddr)>, + request_sender: ConnectedRequestSender, response_receiver: Receiver<(ConnectedResponse, SocketAddr)>, num_bound_sockets: Arc, ) { @@ -44,6 +44,7 @@ pub fn run_socket_worker( let mut events = Events::with_capacity(config.network.poll_event_capacity); let mut connections = ConnectionMap::default(); + let mut pending_scrape_responses = PendingScrapeResponseMap::default(); let mut local_responses: Vec<(Response, SocketAddr)> = Vec::new(); @@ -66,6 +67,7 @@ pub fn run_socket_worker( &config, &state, &mut connections, + &mut pending_scrape_responses, &mut rng, &mut socket, &mut buffer, @@ -81,6 +83,7 @@ pub fn run_socket_worker( &mut socket, &mut buffer, &response_receiver, + &mut pending_scrape_responses, local_responses.drain(..), ); @@ -103,10 +106,11 @@ fn read_requests( config: &Config, state: &State, connections: &mut ConnectionMap, + pending_scrape_responses: &mut PendingScrapeResponseMap, rng: &mut StdRng, socket: &mut UdpSocket, buffer: &mut [u8], - request_sender: &Sender<(ConnectedRequest, SocketAddr)>, + request_sender: &ConnectedRequestSender, local_responses: &mut Vec<(Response, SocketAddr)>, ) { let mut requests_received: usize = 0; @@ -147,6 +151,7 @@ fn read_requests( handle_request( config, connections, + pending_scrape_responses, &mut access_list_cache, rng, request_sender, @@ -185,60 +190,41 @@ fn send_responses( socket: &mut UdpSocket, buffer: &mut [u8], response_receiver: &Receiver<(ConnectedResponse, SocketAddr)>, + pending_scrape_responses: &mut PendingScrapeResponseMap, local_responses: Drain<(Response, SocketAddr)>, ) { let mut responses_sent: usize = 0; let mut bytes_sent: usize = 0; - let mut cursor = Cursor::new(buffer); + for (response, addr) in local_responses { + send_response( + config, + socket, + buffer, + &mut responses_sent, + &mut bytes_sent, + response, + addr, + ); + } - let response_iterator = local_responses.into_iter().chain( - response_receiver - .try_iter() - .map(|(response, addr)| (response.into(), addr)), - ); - - for (response, addr) in response_iterator { - cursor.set_position(0); - - let addr = if config.network.address.is_ipv4() { - if let SocketAddr::V4(addr) = addr { - SocketAddr::V4(addr) - } else { - unreachable!() - } - } else { - match addr { - SocketAddr::V4(addr) => { - let ip = addr.ip().to_ipv6_mapped(); - - SocketAddr::V6(SocketAddrV6::new(ip, addr.port(), 0, 0)) - } - addr => addr, - } + for (response, addr) in response_receiver.try_iter() { + let opt_response = match response { + ConnectedResponse::Scrape(r) => pending_scrape_responses.add_and_get_finished(r), + ConnectedResponse::AnnounceIpv4(r) => Some(Response::AnnounceIpv4(r)), + ConnectedResponse::AnnounceIpv6(r) => Some(Response::AnnounceIpv6(r)), }; - match response.write(&mut cursor) { - Ok(()) => { - let amt = cursor.position() as usize; - - match socket.send_to(&cursor.get_ref()[..amt], addr) { - Ok(amt) => { - responses_sent += 1; - bytes_sent += amt; - } - Err(err) => { - if err.kind() == ErrorKind::WouldBlock { - break; - } - - ::log::info!("send_to error: {}", err); - } - } - } - Err(err) => { - ::log::error!("Response::write error: {:?}", err); - } + if let Some(response) = opt_response { + send_response( + config, + socket, + buffer, + &mut responses_sent, + &mut bytes_sent, + response, + addr, + ); } } @@ -253,3 +239,51 @@ fn send_responses( .fetch_add(bytes_sent, Ordering::SeqCst); } } + +fn send_response( + config: &Config, + socket: &mut UdpSocket, + buffer: &mut [u8], + responses_sent: &mut usize, + bytes_sent: &mut usize, + response: Response, + addr: SocketAddr, +) { + let mut cursor = Cursor::new(buffer); + + let addr = if config.network.address.is_ipv4() { + if let SocketAddr::V4(addr) = addr { + SocketAddr::V4(addr) + } else { + unreachable!() + } + } else { + match addr { + SocketAddr::V4(addr) => { + let ip = addr.ip().to_ipv6_mapped(); + + SocketAddr::V6(SocketAddrV6::new(ip, addr.port(), 0, 0)) + } + addr => addr, + } + }; + + match response.write(&mut cursor) { + Ok(()) => { + let amt = cursor.position() as usize; + + match socket.send_to(&cursor.get_ref()[..amt], addr) { + Ok(amt) => { + *responses_sent += 1; + *bytes_sent += amt; + } + Err(err) => { + ::log::info!("send_to error: {}", err); + } + } + } + Err(err) => { + ::log::error!("Response::write error: {:?}", err); + } + } +} diff --git a/aquatic_udp/src/lib/network_uring.rs b/aquatic_udp/src/lib/network_uring.rs index 7d56705..63be9cc 100644 --- a/aquatic_udp/src/lib/network_uring.rs +++ b/aquatic_udp/src/lib/network_uring.rs @@ -11,7 +11,7 @@ use std::time::{Duration, Instant}; use aquatic_common::access_list::create_access_list_cache; use aquatic_common::ValidUntil; -use crossbeam_channel::{Receiver, Sender}; +use crossbeam_channel::Receiver; use io_uring::types::{Fixed, Timespec}; use io_uring::SubmissionQueue; use libc::{ @@ -103,7 +103,7 @@ impl Into for UserData { pub fn run_socket_worker( state: State, config: Config, - request_sender: Sender<(ConnectedRequest, SocketAddr)>, + request_sender: ConnectedRequestSender, response_receiver: Receiver<(ConnectedResponse, SocketAddr)>, num_bound_sockets: Arc, ) { @@ -114,6 +114,7 @@ pub fn run_socket_worker( num_bound_sockets.fetch_add(1, Ordering::SeqCst); let mut connections = ConnectionMap::default(); + let mut pending_scrape_responses = PendingScrapeResponseMap::default(); let mut access_list_cache = create_access_list_cache(&state.access_list); let mut local_responses: Vec<(Response, SocketAddr)> = Vec::new(); @@ -252,6 +253,7 @@ pub fn run_socket_worker( handle_request( &config, &mut connections, + &mut pending_scrape_responses, &mut access_list_cache, &mut rng, &request_sender, @@ -333,19 +335,27 @@ pub fn run_socket_worker( .try_iter() .take(MAX_SEND_EVENTS - send_entries.len()) { - queue_response( - &config, - &mut sq, - fd, - &mut send_entries, - &mut buffers, - &mut iovs, - &mut sockaddrs_ipv4, - &mut sockaddrs_ipv6, - &mut msghdrs, - response.into(), - addr, - ); + let opt_response = match response { + ConnectedResponse::Scrape(r) => pending_scrape_responses.add_and_get_finished(r), + ConnectedResponse::AnnounceIpv4(r) => Some(Response::AnnounceIpv4(r)), + ConnectedResponse::AnnounceIpv6(r) => Some(Response::AnnounceIpv6(r)), + }; + + if let Some(response) = opt_response { + queue_response( + &config, + &mut sq, + fd, + &mut send_entries, + &mut buffers, + &mut iovs, + &mut sockaddrs_ipv4, + &mut sockaddrs_ipv6, + &mut msghdrs, + response, + addr, + ); + } } if iter_counter % 32 == 0 { diff --git a/aquatic_udp_bench/src/announce.rs b/aquatic_udp_bench/src/announce.rs index 1354bdd..756bac7 100644 --- a/aquatic_udp_bench/src/announce.rs +++ b/aquatic_udp_bench/src/announce.rs @@ -16,7 +16,7 @@ use crate::config::BenchConfig; pub fn bench_announce_handler( bench_config: &BenchConfig, aquatic_config: &Config, - request_sender: &Sender<(ConnectedRequest, SocketAddr)>, + request_sender: &Sender<(SocketWorkerIndex, ConnectedRequest, SocketAddr)>, response_receiver: &Receiver<(ConnectedResponse, SocketAddr)>, rng: &mut impl Rng, info_hashes: &[InfoHash], @@ -38,7 +38,11 @@ pub fn bench_announce_handler( for request_chunk in requests.chunks(p) { for (request, src) in request_chunk { request_sender - .send((ConnectedRequest::Announce(request.clone()), *src)) + .send(( + SocketWorkerIndex(0), + ConnectedRequest::Announce(request.clone()), + *src, + )) .unwrap(); } diff --git a/aquatic_udp_bench/src/main.rs b/aquatic_udp_bench/src/main.rs index fb2895f..13c602e 100644 --- a/aquatic_udp_bench/src/main.rs +++ b/aquatic_udp_bench/src/main.rs @@ -39,21 +39,17 @@ fn main() { pub fn run(bench_config: BenchConfig) -> ::anyhow::Result<()> { // Setup common state, spawn request handlers - let state = State::default(); let aquatic_config = Config::default(); let (request_sender, request_receiver) = unbounded(); let (response_sender, response_receiver) = unbounded(); - for _ in 0..bench_config.num_threads { - let state = state.clone(); - let config = aquatic_config.clone(); - let request_receiver = request_receiver.clone(); - let response_sender = response_sender.clone(); + let response_sender = ConnectedResponseSender::new(vec![response_sender]); - ::std::thread::spawn(move || { - run_request_worker(state, config, request_receiver, response_sender) - }); + { + let config = aquatic_config.clone(); + + ::std::thread::spawn(move || run_request_worker(config, request_receiver, response_sender)); } // Run benchmarks diff --git a/aquatic_udp_bench/src/scrape.rs b/aquatic_udp_bench/src/scrape.rs index 39d6ade..4cef9c0 100644 --- a/aquatic_udp_bench/src/scrape.rs +++ b/aquatic_udp_bench/src/scrape.rs @@ -16,7 +16,7 @@ use crate::config::BenchConfig; pub fn bench_scrape_handler( bench_config: &BenchConfig, aquatic_config: &Config, - request_sender: &Sender<(ConnectedRequest, SocketAddr)>, + request_sender: &Sender<(SocketWorkerIndex, ConnectedRequest, SocketAddr)>, response_receiver: &Receiver<(ConnectedResponse, SocketAddr)>, rng: &mut impl Rng, info_hashes: &[InfoHash], @@ -42,20 +42,25 @@ pub fn bench_scrape_handler( for round in (0..bench_config.num_rounds).progress_with(pb) { for request_chunk in requests.chunks(p) { for (request, src) in request_chunk { - let request = ConnectedRequest::Scrape { - request: request.clone(), - original_indices: Vec::new(), - }; + let request = ConnectedRequest::Scrape(PendingScrapeRequest { + transaction_id: request.transaction_id, + info_hashes: request + .info_hashes + .clone() + .into_iter() + .enumerate() + .collect(), + }); - request_sender.send((request, *src)).unwrap(); + request_sender + .send((SocketWorkerIndex(0), request, *src)) + .unwrap(); } - while let Ok((ConnectedResponse::Scrape { response, .. }, _)) = - response_receiver.try_recv() - { + while let Ok((ConnectedResponse::Scrape(response), _)) = response_receiver.try_recv() { num_responses += 1; - if let Some(stat) = response.torrent_stats.last() { + if let Some(stat) = response.torrent_stats.values().last() { dummy ^= stat.leechers.0; } } @@ -64,10 +69,10 @@ pub fn bench_scrape_handler( let total = bench_config.num_scrape_requests * (round + 1); while num_responses < total { - if let Ok((ConnectedResponse::Scrape { response, .. }, _)) = response_receiver.recv() { + if let Ok((ConnectedResponse::Scrape(response), _)) = response_receiver.recv() { num_responses += 1; - if let Some(stat) = response.torrent_stats.last() { + if let Some(stat) = response.torrent_stats.values().last() { dummy ^= stat.leechers.0; } }