diff --git a/aquatic_udp/src/lib/common/announce.rs b/aquatic_udp/src/lib/common/handlers.rs similarity index 69% rename from aquatic_udp/src/lib/common/announce.rs rename to aquatic_udp/src/lib/common/handlers.rs index 2a63b61..41d7728 100644 --- a/aquatic_udp/src/lib/common/announce.rs +++ b/aquatic_udp/src/lib/common/handlers.rs @@ -1,9 +1,33 @@ +use std::net::SocketAddr; + use rand::rngs::SmallRng; use aquatic_common::extract_response_peers; +use aquatic_common::convert_ipv4_mapped_ipv6; use crate::common::*; +#[derive(Debug)] +pub enum ConnectedRequest { + Announce(AnnounceRequest), + Scrape(ScrapeRequest), +} + +#[derive(Debug)] +pub enum ConnectedResponse { + Announce(AnnounceResponse), + Scrape(ScrapeResponse), +} + +impl Into for ConnectedResponse { + fn into(self) -> Response { + match self { + Self::Announce(response) => Response::Announce(response), + Self::Scrape(response) => Response::Scrape(response), + } + } +} + pub fn handle_announce_request( config: &Config, rng: &mut SmallRng, @@ -83,6 +107,57 @@ fn calc_max_num_peers_to_take(config: &Config, peers_wanted: i32) -> usize { } } +#[inline] +pub fn handle_scrape_request( + torrents: &mut TorrentMaps, + src: SocketAddr, + request: ScrapeRequest, +) -> ScrapeResponse { + let empty_stats = create_torrent_scrape_statistics(0, 0); + + let mut stats: Vec = Vec::with_capacity(request.info_hashes.len()); + + let peer_ip = convert_ipv4_mapped_ipv6(src.ip()); + + if peer_ip.is_ipv4() { + for info_hash in request.info_hashes.iter() { + if let Some(torrent_data) = torrents.ipv4.get(info_hash) { + stats.push(create_torrent_scrape_statistics( + torrent_data.num_seeders as i32, + torrent_data.num_leechers as i32, + )); + } else { + stats.push(empty_stats); + } + } + } else { + for info_hash in request.info_hashes.iter() { + if let Some(torrent_data) = torrents.ipv6.get(info_hash) { + stats.push(create_torrent_scrape_statistics( + torrent_data.num_seeders as i32, + torrent_data.num_leechers as i32, + )); + } else { + stats.push(empty_stats); + } + } + } + + ScrapeResponse { + transaction_id: request.transaction_id, + torrent_stats: stats, + } +} + +#[inline(always)] +fn create_torrent_scrape_statistics(seeders: i32, leechers: i32) -> TorrentScrapeStatistics { + TorrentScrapeStatistics { + seeders: NumberOfPeers(seeders), + completed: NumberOfDownloads(0), // No implementation planned + leechers: NumberOfPeers(leechers), + } +} + #[cfg(test)] mod tests { use std::collections::HashSet; diff --git a/aquatic_udp/src/lib/common/mod.rs b/aquatic_udp/src/lib/common/mod.rs index 8c84c3c..bcc073b 100644 --- a/aquatic_udp/src/lib/common/mod.rs +++ b/aquatic_udp/src/lib/common/mod.rs @@ -11,7 +11,7 @@ pub use aquatic_udp_protocol::*; use crate::config::Config; -pub mod announce; +pub mod handlers; pub mod network; pub const MAX_PACKET_SIZE: usize = 4096; diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs index 8ccc0c9..88ddc33 100644 --- a/aquatic_udp/src/lib/glommio/handlers.rs +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -10,15 +10,16 @@ use glommio::{enclose, prelude::*}; use rand::prelude::SmallRng; use rand::SeedableRng; -use crate::common::announce::handle_announce_request; +use crate::common::handlers::handle_announce_request; use crate::common::*; +use crate::common::handlers::*; use crate::config::Config; use crate::glommio::common::update_access_list; pub async fn run_request_worker( config: Config, - request_mesh_builder: MeshBuilder<(usize, AnnounceRequest, SocketAddr), Partial>, - response_mesh_builder: MeshBuilder<(AnnounceResponse, SocketAddr), Partial>, + request_mesh_builder: MeshBuilder<(usize, ConnectedRequest, SocketAddr), Partial>, + response_mesh_builder: MeshBuilder<(ConnectedResponse, SocketAddr), Partial>, access_list: AccessList, ) { let (_, mut request_receivers) = request_mesh_builder.join(Role::Consumer).await.unwrap(); @@ -62,10 +63,10 @@ pub async fn run_request_worker( async fn handle_request_stream( config: Config, torrents: Rc>, - response_senders: Rc>, + response_senders: Rc>, mut stream: S, ) where - S: Stream + ::std::marker::Unpin, + S: Stream + ::std::marker::Unpin, { let mut rng = SmallRng::from_entropy(); @@ -81,23 +82,30 @@ async fn handle_request_stream( })); while let Some((producer_index, request, addr)) = stream.next().await { - let response = match addr.ip() { - IpAddr::V4(ip) => handle_announce_request( - &config, - &mut rng, - &mut torrents.borrow_mut().ipv4, - request, - ip, - peer_valid_until.borrow().to_owned(), - ), - IpAddr::V6(ip) => handle_announce_request( - &config, - &mut rng, - &mut torrents.borrow_mut().ipv6, - request, - ip, - peer_valid_until.borrow().to_owned(), - ), + let response = match request { + ConnectedRequest::Announce(request) => { + ConnectedResponse::Announce(match addr.ip() { + IpAddr::V4(ip) => handle_announce_request( + &config, + &mut rng, + &mut torrents.borrow_mut().ipv4, + request, + ip, + peer_valid_until.borrow().to_owned(), + ), + IpAddr::V6(ip) => handle_announce_request( + &config, + &mut rng, + &mut torrents.borrow_mut().ipv6, + request, + ip, + peer_valid_until.borrow().to_owned(), + ), + }) + } + ConnectedRequest::Scrape(request) => { + ConnectedResponse::Scrape(handle_scrape_request(&mut torrents.borrow_mut(), addr, request)) + } }; ::log::debug!("preparing to send response to channel: {:?}", response); diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs index d66db0d..22516a7 100644 --- a/aquatic_udp/src/lib/glommio/network.rs +++ b/aquatic_udp/src/lib/glommio/network.rs @@ -1,5 +1,6 @@ use std::cell::RefCell; use std::io::Cursor; +use std::iter::FromIterator; use std::net::{IpAddr, SocketAddr}; use std::rc::Rc; use std::sync::{ @@ -15,20 +16,45 @@ use glommio::enclose; use glommio::net::UdpSocket; use glommio::prelude::*; use glommio::timer::TimerActionRepeat; +use hashbrown::HashMap; use rand::prelude::{Rng, SeedableRng, StdRng}; use aquatic_udp_protocol::{IpVersion, Request, Response}; use super::common::update_access_list; +use crate::common::handlers::*; use crate::common::network::ConnectionMap; use crate::common::*; use crate::config::Config; +struct PendingScrapeResponse { + pending_worker_responses: usize, + valid_until: ValidUntil, + src: SocketAddr, + stats: Vec, +} + +#[derive(Default)] +struct PendingScrapeResponses(HashMap); + +impl PendingScrapeResponses { + fn insert_empty(&mut self, transaction_id: TransactionId, src: SocketAddr, pending_worker_responses: usize, valid_until: ValidUntil) { + let pending = PendingScrapeResponse { + pending_worker_responses, + valid_until, + src, + stats: Vec::new(), + }; + + self.0.insert(transaction_id, pending); + } +} + pub async fn run_socket_worker( config: Config, - request_mesh_builder: MeshBuilder<(usize, AnnounceRequest, SocketAddr), Partial>, - response_mesh_builder: MeshBuilder<(AnnounceResponse, SocketAddr), Partial>, + request_mesh_builder: MeshBuilder<(usize, ConnectedRequest, SocketAddr), Partial>, + response_mesh_builder: MeshBuilder<(ConnectedResponse, SocketAddr), Partial>, num_bound_sockets: Arc, access_list: AccessList, ) { @@ -52,12 +78,15 @@ pub async fn run_socket_worker( let response_consumer_index = response_receivers.consumer_id().unwrap(); + let pending_scrape_responses = Rc::new(RefCell::new(PendingScrapeResponses::default())); + spawn_local(read_requests( config.clone(), request_senders, response_consumer_index, local_sender, socket.clone(), + pending_scrape_responses, access_list, )) .detach(); @@ -75,10 +104,11 @@ pub async fn run_socket_worker( async fn read_requests( config: Config, - request_senders: Senders<(usize, AnnounceRequest, SocketAddr)>, + request_senders: Senders<(usize, ConnectedRequest, SocketAddr)>, response_consumer_index: usize, local_sender: LocalSender<(Response, SocketAddr)>, socket: Rc, + pending_scrape_responses: Rc>, access_list: AccessList, ) { let mut rng = StdRng::from_entropy(); @@ -150,11 +180,11 @@ async fn read_requests( .allows(access_list_mode, &request.info_hash.0) { let request_consumer_index = - (request.info_hash.0[0] as usize) % config.request_workers; + calculate_request_consumer_index(&config, request.info_hash); if let Err(err) = request_senders.try_send_to( request_consumer_index, - (response_consumer_index, request, src), + (response_consumer_index, ConnectedRequest::Announce(request), src), ) { ::log::warn!("request_sender.try_send failed: {:?}", err) } @@ -170,12 +200,36 @@ async fn read_requests( } Ok(Request::Scrape(request)) => { if connections.borrow().contains(request.connection_id, src) { - let response = Response::Error(ErrorResponse { - transaction_id: request.transaction_id, - message: "Scrape requests not supported".into(), - }); + let mut consumer_requests: HashMap = HashMap::new(); - local_sender.try_send((response, src)).unwrap(); + for info_hash in request.info_hashes { + consumer_requests + .entry(calculate_request_consumer_index(&config, info_hash)) + .or_insert( + ScrapeRequest { + transaction_id: request.transaction_id, + connection_id: request.connection_id, + info_hashes: Vec::new(), + } + ) + .info_hashes.push(info_hash); + } + + pending_scrape_responses.borrow_mut().insert_empty( + request.transaction_id, + src, + consumer_requests.len(), + connection_valid_until.borrow().to_owned(), // FIXME: use seperate ValidUntil + ); + + for (consumer_index, request) in consumer_requests { + if let Err(err) = request_senders.try_send_to( + consumer_index, + (response_consumer_index, ConnectedRequest::Scrape(request), src), + ) { + ::log::warn!("request_sender.try_send failed: {:?}", err) + } + } } } Err(err) => { @@ -234,6 +288,10 @@ where } } +fn calculate_request_consumer_index(config: &Config, info_hash: InfoHash) -> usize { + (info_hash.0[0] as usize) % config.request_workers +} + fn ip_version_from_ip(ip: IpAddr) -> IpVersion { match ip { IpAddr::V4(_) => IpVersion::IPv4, diff --git a/aquatic_udp/src/lib/mio/common.rs b/aquatic_udp/src/lib/mio/common.rs index 8bf2233..bcaff2f 100644 --- a/aquatic_udp/src/lib/mio/common.rs +++ b/aquatic_udp/src/lib/mio/common.rs @@ -4,25 +4,6 @@ use std::sync::{atomic::AtomicUsize, Arc}; use crate::common::*; -pub enum ConnectedRequest { - Announce(AnnounceRequest), - Scrape(ScrapeRequest), -} - -pub enum ConnectedResponse { - Announce(AnnounceResponse), - Scrape(ScrapeResponse), -} - -impl Into for ConnectedResponse { - fn into(self) -> Response { - match self { - Self::Announce(response) => Response::Announce(response), - Self::Scrape(response) => Response::Scrape(response), - } - } -} - #[derive(Default)] pub struct Statistics { pub requests_received: AtomicUsize, diff --git a/aquatic_udp/src/lib/mio/handlers/announce.rs b/aquatic_udp/src/lib/mio/handlers/announce.rs index 549d061..b399e3b 100644 --- a/aquatic_udp/src/lib/mio/handlers/announce.rs +++ b/aquatic_udp/src/lib/mio/handlers/announce.rs @@ -7,10 +7,10 @@ use rand::rngs::SmallRng; use aquatic_common::convert_ipv4_mapped_ipv6; use aquatic_udp_protocol::*; -use crate::common::announce::handle_announce_request; +use crate::common::handlers::handle_announce_request; use crate::common::*; use crate::config::Config; -use crate::mio::common::*; +use crate::common::handlers::*; #[inline] pub fn handle_announce_requests( diff --git a/aquatic_udp/src/lib/mio/handlers/mod.rs b/aquatic_udp/src/lib/mio/handlers/mod.rs index af8e5a8..5afd71b 100644 --- a/aquatic_udp/src/lib/mio/handlers/mod.rs +++ b/aquatic_udp/src/lib/mio/handlers/mod.rs @@ -7,13 +7,11 @@ use rand::{rngs::SmallRng, SeedableRng}; use aquatic_udp_protocol::*; use crate::config::Config; +use crate::common::handlers::*; use crate::mio::common::*; mod announce; -mod scrape; - use announce::handle_announce_requests; -use scrape::handle_scrape_requests; pub fn run_request_worker( state: State, @@ -76,7 +74,9 @@ pub fn run_request_worker( &mut responses, ); - handle_scrape_requests(&mut torrents, scrape_requests.drain(..), &mut responses); + responses.extend(scrape_requests.drain(..).map(|(request, src)| { + (ConnectedResponse::Scrape(handle_scrape_request(&mut torrents, src, request)), src) + })); } for r in responses.drain(..) { diff --git a/aquatic_udp/src/lib/mio/handlers/scrape.rs b/aquatic_udp/src/lib/mio/handlers/scrape.rs deleted file mode 100644 index b1a6742..0000000 --- a/aquatic_udp/src/lib/mio/handlers/scrape.rs +++ /dev/null @@ -1,66 +0,0 @@ -use std::net::SocketAddr; -use std::vec::Drain; - -use parking_lot::MutexGuard; - -use aquatic_common::convert_ipv4_mapped_ipv6; -use aquatic_udp_protocol::*; - -use crate::mio::common::*; - -use crate::common::*; - -#[inline] -pub fn handle_scrape_requests( - torrents: &mut MutexGuard, - requests: Drain<(ScrapeRequest, SocketAddr)>, - responses: &mut Vec<(ConnectedResponse, SocketAddr)>, -) { - let empty_stats = create_torrent_scrape_statistics(0, 0); - - responses.extend(requests.map(|(request, src)| { - let mut stats: Vec = Vec::with_capacity(request.info_hashes.len()); - - let peer_ip = convert_ipv4_mapped_ipv6(src.ip()); - - if peer_ip.is_ipv4() { - for info_hash in request.info_hashes.iter() { - if let Some(torrent_data) = torrents.ipv4.get(info_hash) { - stats.push(create_torrent_scrape_statistics( - torrent_data.num_seeders as i32, - torrent_data.num_leechers as i32, - )); - } else { - stats.push(empty_stats); - } - } - } else { - for info_hash in request.info_hashes.iter() { - if let Some(torrent_data) = torrents.ipv6.get(info_hash) { - stats.push(create_torrent_scrape_statistics( - torrent_data.num_seeders as i32, - torrent_data.num_leechers as i32, - )); - } else { - stats.push(empty_stats); - } - } - } - - let response = ConnectedResponse::Scrape(ScrapeResponse { - transaction_id: request.transaction_id, - torrent_stats: stats, - }); - - (response, src) - })); -} - -#[inline(always)] -fn create_torrent_scrape_statistics(seeders: i32, leechers: i32) -> TorrentScrapeStatistics { - TorrentScrapeStatistics { - seeders: NumberOfPeers(seeders), - completed: NumberOfDownloads(0), // No implementation planned - leechers: NumberOfPeers(leechers), - } -} diff --git a/aquatic_udp/src/lib/mio/network.rs b/aquatic_udp/src/lib/mio/network.rs index fe34023..1739dd7 100644 --- a/aquatic_udp/src/lib/mio/network.rs +++ b/aquatic_udp/src/lib/mio/network.rs @@ -18,6 +18,7 @@ use aquatic_udp_protocol::{IpVersion, Request, Response}; use crate::common::network::ConnectionMap; use crate::common::*; +use crate::common::handlers::*; use crate::config::Config; use super::common::*; diff --git a/aquatic_udp_bench/src/announce.rs b/aquatic_udp_bench/src/announce.rs index d71f77c..eb09c3a 100644 --- a/aquatic_udp_bench/src/announce.rs +++ b/aquatic_udp_bench/src/announce.rs @@ -8,7 +8,7 @@ use rand_distr::Pareto; use aquatic_udp::common::*; use aquatic_udp::config::Config; -use aquatic_udp::mio::common::*; +use aquatic_udp::common::handlers::*; use crate::common::*; use crate::config::BenchConfig; diff --git a/aquatic_udp_bench/src/scrape.rs b/aquatic_udp_bench/src/scrape.rs index a7d5c18..450b65e 100644 --- a/aquatic_udp_bench/src/scrape.rs +++ b/aquatic_udp_bench/src/scrape.rs @@ -8,7 +8,7 @@ use rand_distr::Pareto; use aquatic_udp::common::*; use aquatic_udp::config::Config; -use aquatic_udp::mio::common::*; +use aquatic_udp::common::handlers::*; use crate::common::*; use crate::config::BenchConfig;