From ff49022efe87d08128b8f1926e817aa037bbce70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Fri, 15 Oct 2021 23:47:42 +0200 Subject: [PATCH] aquatic_udp: split handlers.rs into submodules --- aquatic_udp/src/lib/handlers.rs | 452 ----------------------- aquatic_udp/src/lib/handlers/announce.rs | 234 ++++++++++++ aquatic_udp/src/lib/handlers/connect.rs | 39 ++ aquatic_udp/src/lib/handlers/mod.rs | 153 ++++++++ aquatic_udp/src/lib/handlers/scrape.rs | 64 ++++ aquatic_udp/src/lib/lib.rs | 2 +- aquatic_udp/src/lib/tasks.rs | 10 +- 7 files changed, 497 insertions(+), 457 deletions(-) delete mode 100644 aquatic_udp/src/lib/handlers.rs create mode 100644 aquatic_udp/src/lib/handlers/announce.rs create mode 100644 aquatic_udp/src/lib/handlers/connect.rs create mode 100644 aquatic_udp/src/lib/handlers/mod.rs create mode 100644 aquatic_udp/src/lib/handlers/scrape.rs diff --git a/aquatic_udp/src/lib/handlers.rs b/aquatic_udp/src/lib/handlers.rs deleted file mode 100644 index ecfeae5..0000000 --- a/aquatic_udp/src/lib/handlers.rs +++ /dev/null @@ -1,452 +0,0 @@ -use std::net::{IpAddr, SocketAddr}; -use std::time::Duration; -use std::vec::Drain; - -use crossbeam_channel::{Receiver, Sender}; -use parking_lot::MutexGuard; -use rand::{ - rngs::{SmallRng, StdRng}, - Rng, SeedableRng, -}; - -use aquatic_common::{convert_ipv4_mapped_ipv6, extract_response_peers}; -use aquatic_udp_protocol::*; - -use crate::common::*; -use crate::config::Config; - -pub fn run_request_worker( - state: State, - config: Config, - request_receiver: Receiver<(Request, SocketAddr)>, - response_sender: Sender<(Response, SocketAddr)>, -) { - let mut connect_requests: Vec<(ConnectRequest, SocketAddr)> = Vec::new(); - let mut announce_requests: Vec<(AnnounceRequest, SocketAddr)> = Vec::new(); - let mut scrape_requests: Vec<(ScrapeRequest, SocketAddr)> = Vec::new(); - - let mut responses: Vec<(Response, SocketAddr)> = Vec::new(); - - let mut std_rng = StdRng::from_entropy(); - let mut small_rng = SmallRng::from_rng(&mut std_rng).unwrap(); - - let timeout = Duration::from_micros(config.handlers.channel_recv_timeout_microseconds); - - loop { - let mut opt_connections = None; - - // Collect requests from channel, divide them by type - // - // Collect a maximum number of request. Stop collecting before that - // number is reached if having waited for too long for a request, but - // only if ConnectionMap mutex isn't locked. - for i in 0..config.handlers.max_requests_per_iter { - let (request, src): (Request, SocketAddr) = if i == 0 { - match request_receiver.recv() { - Ok(r) => r, - Err(_) => break, // Really shouldn't happen - } - } else { - match request_receiver.recv_timeout(timeout) { - Ok(r) => r, - Err(_) => { - if let Some(guard) = state.connections.try_lock() { - opt_connections = Some(guard); - - break; - } else { - continue; - } - } - } - }; - - match request { - Request::Connect(r) => connect_requests.push((r, src)), - Request::Announce(r) => announce_requests.push((r, src)), - Request::Scrape(r) => scrape_requests.push((r, src)), - } - } - - let mut connections: MutexGuard = - opt_connections.unwrap_or_else(|| state.connections.lock()); - - handle_connect_requests( - &config, - &mut connections, - &mut std_rng, - connect_requests.drain(..), - &mut responses, - ); - - // Check announce and scrape requests for valid connections - - announce_requests.retain(|(request, src)| { - let connection_key = ConnectionKey { - connection_id: request.connection_id, - socket_addr: *src, - }; - - if connections.contains_key(&connection_key) { - true - } else { - let response = ErrorResponse { - transaction_id: request.transaction_id, - message: "Connection invalid or expired".to_string(), - }; - - responses.push((response.into(), *src)); - - return false; - } - }); - - scrape_requests.retain(|(request, src)| { - let connection_key = ConnectionKey { - connection_id: request.connection_id, - socket_addr: *src, - }; - - if connections.contains_key(&connection_key) { - true - } else { - let response = ErrorResponse { - transaction_id: request.transaction_id, - message: "Connection invalid or expired".to_string(), - }; - - responses.push((response.into(), *src)); - - false - } - }); - - ::std::mem::drop(connections); - - // Generate responses for announce and scrape requests - - if !(announce_requests.is_empty() && scrape_requests.is_empty()) { - let mut torrents = state.torrents.lock(); - - handle_announce_requests( - &config, - &mut torrents, - &mut small_rng, - announce_requests.drain(..), - &mut responses, - ); - handle_scrape_requests(&mut torrents, scrape_requests.drain(..), &mut responses); - } - - for r in responses.drain(..) { - if let Err(err) = response_sender.send(r) { - ::log::error!("error sending response to channel: {}", err); - } - } - } -} - -#[inline] -pub fn handle_connect_requests( - config: &Config, - connections: &mut MutexGuard, - rng: &mut StdRng, - requests: Drain<(ConnectRequest, SocketAddr)>, - responses: &mut Vec<(Response, SocketAddr)>, -) { - let valid_until = ValidUntil::new(config.cleaning.max_connection_age); - - responses.extend(requests.map(|(request, src)| { - let connection_id = ConnectionId(rng.gen()); - - let key = ConnectionKey { - connection_id, - socket_addr: src, - }; - - connections.insert(key, valid_until); - - let response = Response::Connect(ConnectResponse { - connection_id, - transaction_id: request.transaction_id, - }); - - (response, src) - })); -} - -#[inline] -pub fn handle_announce_requests( - config: &Config, - torrents: &mut MutexGuard, - rng: &mut SmallRng, - requests: Drain<(AnnounceRequest, SocketAddr)>, - responses: &mut Vec<(Response, SocketAddr)>, -) { - let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); - let access_list_mode = config.access_list.mode; - - responses.extend(requests.map(|(request, src)| { - let info_hash_allowed = torrents.access_list.allows(access_list_mode, &request.info_hash.0); - - let response = if info_hash_allowed { - let peer_ip = convert_ipv4_mapped_ipv6(src.ip()); - - let response = match peer_ip { - IpAddr::V4(ip) => handle_announce_request( - config, - rng, - &mut torrents.ipv4, - request, - ip, - peer_valid_until, - ), - IpAddr::V6(ip) => handle_announce_request( - config, - rng, - &mut torrents.ipv6, - request, - ip, - peer_valid_until, - ), - }; - - Response::Announce(response) - } else { - Response::Error(ErrorResponse { - transaction_id: request.transaction_id, - message: "Info hash not allowed".to_string(), - }) - }; - - (response, src) - })); -} - -fn handle_announce_request( - config: &Config, - rng: &mut SmallRng, - torrents: &mut TorrentMap, - request: AnnounceRequest, - peer_ip: I, - peer_valid_until: ValidUntil, -) -> AnnounceResponse { - let peer_key = PeerMapKey { - ip: peer_ip, - peer_id: request.peer_id, - }; - - let peer_status = PeerStatus::from_event_and_bytes_left(request.event, request.bytes_left); - - let peer = Peer { - ip_address: peer_ip, - port: request.port, - status: peer_status, - valid_until: peer_valid_until, - }; - - let torrent_data = torrents.entry(request.info_hash).or_default(); - - let opt_removed_peer = match peer_status { - PeerStatus::Leeching => { - torrent_data.num_leechers += 1; - - torrent_data.peers.insert(peer_key, peer) - } - PeerStatus::Seeding => { - torrent_data.num_seeders += 1; - - torrent_data.peers.insert(peer_key, peer) - } - PeerStatus::Stopped => torrent_data.peers.remove(&peer_key), - }; - - match opt_removed_peer.map(|peer| peer.status) { - Some(PeerStatus::Leeching) => { - torrent_data.num_leechers -= 1; - } - Some(PeerStatus::Seeding) => { - torrent_data.num_seeders -= 1; - } - _ => {} - } - - let max_num_peers_to_take = calc_max_num_peers_to_take(config, request.peers_wanted.0); - - let response_peers = extract_response_peers( - rng, - &torrent_data.peers, - max_num_peers_to_take, - peer_key, - Peer::to_response_peer, - ); - - AnnounceResponse { - transaction_id: request.transaction_id, - announce_interval: AnnounceInterval(config.protocol.peer_announce_interval), - leechers: NumberOfPeers(torrent_data.num_leechers as i32), - seeders: NumberOfPeers(torrent_data.num_seeders as i32), - peers: response_peers, - } -} - -#[inline] -pub fn handle_scrape_requests( - torrents: &mut MutexGuard, - requests: Drain<(ScrapeRequest, SocketAddr)>, - responses: &mut Vec<(Response, SocketAddr)>, -) { - let empty_stats = create_torrent_scrape_statistics(0, 0); - - responses.extend(requests.map(|(request, src)| { - let mut stats: Vec = Vec::with_capacity(request.info_hashes.len()); - - let peer_ip = convert_ipv4_mapped_ipv6(src.ip()); - - if peer_ip.is_ipv4() { - for info_hash in request.info_hashes.iter() { - if let Some(torrent_data) = torrents.ipv4.get(info_hash) { - stats.push(create_torrent_scrape_statistics( - torrent_data.num_seeders as i32, - torrent_data.num_leechers as i32, - )); - } else { - stats.push(empty_stats); - } - } - } else { - for info_hash in request.info_hashes.iter() { - if let Some(torrent_data) = torrents.ipv6.get(info_hash) { - stats.push(create_torrent_scrape_statistics( - torrent_data.num_seeders as i32, - torrent_data.num_leechers as i32, - )); - } else { - stats.push(empty_stats); - } - } - } - - let response = Response::Scrape(ScrapeResponse { - transaction_id: request.transaction_id, - torrent_stats: stats, - }); - - (response, src) - })); -} - -#[inline] -fn calc_max_num_peers_to_take(config: &Config, peers_wanted: i32) -> usize { - if peers_wanted <= 0 { - config.protocol.max_response_peers as usize - } else { - ::std::cmp::min( - config.protocol.max_response_peers as usize, - peers_wanted as usize, - ) - } -} - -#[inline(always)] -pub fn create_torrent_scrape_statistics(seeders: i32, leechers: i32) -> TorrentScrapeStatistics { - TorrentScrapeStatistics { - seeders: NumberOfPeers(seeders), - completed: NumberOfDownloads(0), // No implementation planned - leechers: NumberOfPeers(leechers), - } -} - -#[cfg(test)] -mod tests { - use std::collections::HashSet; - use std::net::Ipv4Addr; - - use indexmap::IndexMap; - use quickcheck::{quickcheck, TestResult}; - use rand::thread_rng; - - use super::*; - - fn gen_peer_map_key_and_value(i: u32) -> (PeerMapKey, Peer) { - let ip_address = Ipv4Addr::from(i.to_be_bytes()); - let peer_id = PeerId([0; 20]); - - let key = PeerMapKey { - ip: ip_address, - peer_id, - }; - let value = Peer { - ip_address, - port: Port(1), - status: PeerStatus::Leeching, - valid_until: ValidUntil::new(0), - }; - - (key, value) - } - - #[test] - fn test_extract_response_peers() { - fn prop(data: (u16, u16)) -> TestResult { - let gen_num_peers = data.0 as u32; - let req_num_peers = data.1 as usize; - - let mut peer_map: PeerMap = IndexMap::with_capacity(gen_num_peers as usize); - - let mut opt_sender_key = None; - let mut opt_sender_peer = None; - - for i in 0..gen_num_peers { - let (key, value) = gen_peer_map_key_and_value((i << 16) + i); - - if i == 0 { - opt_sender_key = Some(key); - opt_sender_peer = Some(value.to_response_peer()); - } - - peer_map.insert(key, value); - } - - let mut rng = thread_rng(); - - let peers = extract_response_peers( - &mut rng, - &peer_map, - req_num_peers, - opt_sender_key.unwrap_or_else(|| gen_peer_map_key_and_value(1).0), - Peer::to_response_peer, - ); - - // Check that number of returned peers is correct - - let mut success = peers.len() <= req_num_peers; - - if req_num_peers >= gen_num_peers as usize { - success &= peers.len() == gen_num_peers as usize - || peers.len() + 1 == gen_num_peers as usize; - } - - // Check that returned peers are unique (no overlap) and that sender - // isn't returned - - let mut ip_addresses = HashSet::with_capacity(peers.len()); - - for peer in peers { - if peer == opt_sender_peer.clone().unwrap() - || ip_addresses.contains(&peer.ip_address) - { - success = false; - - break; - } - - ip_addresses.insert(peer.ip_address); - } - - TestResult::from_bool(success) - } - - quickcheck(prop as fn((u16, u16)) -> TestResult); - } -} diff --git a/aquatic_udp/src/lib/handlers/announce.rs b/aquatic_udp/src/lib/handlers/announce.rs new file mode 100644 index 0000000..9c718ac --- /dev/null +++ b/aquatic_udp/src/lib/handlers/announce.rs @@ -0,0 +1,234 @@ +use std::net::{IpAddr, SocketAddr}; +use std::vec::Drain; + +use parking_lot::MutexGuard; +use rand::rngs::SmallRng; + +use aquatic_common::{convert_ipv4_mapped_ipv6, extract_response_peers}; +use aquatic_udp_protocol::*; + +use crate::common::*; +use crate::config::Config; + +#[inline] +pub fn handle_announce_requests( + config: &Config, + torrents: &mut MutexGuard, + rng: &mut SmallRng, + requests: Drain<(AnnounceRequest, SocketAddr)>, + responses: &mut Vec<(Response, SocketAddr)>, +) { + let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); + let access_list_mode = config.access_list.mode; + + responses.extend(requests.map(|(request, src)| { + let info_hash_allowed = torrents + .access_list + .allows(access_list_mode, &request.info_hash.0); + + let response = if info_hash_allowed { + let peer_ip = convert_ipv4_mapped_ipv6(src.ip()); + + let response = match peer_ip { + IpAddr::V4(ip) => handle_announce_request( + config, + rng, + &mut torrents.ipv4, + request, + ip, + peer_valid_until, + ), + IpAddr::V6(ip) => handle_announce_request( + config, + rng, + &mut torrents.ipv6, + request, + ip, + peer_valid_until, + ), + }; + + Response::Announce(response) + } else { + Response::Error(ErrorResponse { + transaction_id: request.transaction_id, + message: "Info hash not allowed".to_string(), + }) + }; + + (response, src) + })); +} + +fn handle_announce_request( + config: &Config, + rng: &mut SmallRng, + torrents: &mut TorrentMap, + request: AnnounceRequest, + peer_ip: I, + peer_valid_until: ValidUntil, +) -> AnnounceResponse { + let peer_key = PeerMapKey { + ip: peer_ip, + peer_id: request.peer_id, + }; + + let peer_status = PeerStatus::from_event_and_bytes_left(request.event, request.bytes_left); + + let peer = Peer { + ip_address: peer_ip, + port: request.port, + status: peer_status, + valid_until: peer_valid_until, + }; + + let torrent_data = torrents.entry(request.info_hash).or_default(); + + let opt_removed_peer = match peer_status { + PeerStatus::Leeching => { + torrent_data.num_leechers += 1; + + torrent_data.peers.insert(peer_key, peer) + } + PeerStatus::Seeding => { + torrent_data.num_seeders += 1; + + torrent_data.peers.insert(peer_key, peer) + } + PeerStatus::Stopped => torrent_data.peers.remove(&peer_key), + }; + + match opt_removed_peer.map(|peer| peer.status) { + Some(PeerStatus::Leeching) => { + torrent_data.num_leechers -= 1; + } + Some(PeerStatus::Seeding) => { + torrent_data.num_seeders -= 1; + } + _ => {} + } + + let max_num_peers_to_take = calc_max_num_peers_to_take(config, request.peers_wanted.0); + + let response_peers = extract_response_peers( + rng, + &torrent_data.peers, + max_num_peers_to_take, + peer_key, + Peer::to_response_peer, + ); + + AnnounceResponse { + transaction_id: request.transaction_id, + announce_interval: AnnounceInterval(config.protocol.peer_announce_interval), + leechers: NumberOfPeers(torrent_data.num_leechers as i32), + seeders: NumberOfPeers(torrent_data.num_seeders as i32), + peers: response_peers, + } +} + +#[inline] +fn calc_max_num_peers_to_take(config: &Config, peers_wanted: i32) -> usize { + if peers_wanted <= 0 { + config.protocol.max_response_peers as usize + } else { + ::std::cmp::min( + config.protocol.max_response_peers as usize, + peers_wanted as usize, + ) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + use std::net::Ipv4Addr; + + use indexmap::IndexMap; + use quickcheck::{quickcheck, TestResult}; + use rand::thread_rng; + + use super::*; + + fn gen_peer_map_key_and_value(i: u32) -> (PeerMapKey, Peer) { + let ip_address = Ipv4Addr::from(i.to_be_bytes()); + let peer_id = PeerId([0; 20]); + + let key = PeerMapKey { + ip: ip_address, + peer_id, + }; + let value = Peer { + ip_address, + port: Port(1), + status: PeerStatus::Leeching, + valid_until: ValidUntil::new(0), + }; + + (key, value) + } + + #[test] + fn test_extract_response_peers() { + fn prop(data: (u16, u16)) -> TestResult { + let gen_num_peers = data.0 as u32; + let req_num_peers = data.1 as usize; + + let mut peer_map: PeerMap = IndexMap::with_capacity(gen_num_peers as usize); + + let mut opt_sender_key = None; + let mut opt_sender_peer = None; + + for i in 0..gen_num_peers { + let (key, value) = gen_peer_map_key_and_value((i << 16) + i); + + if i == 0 { + opt_sender_key = Some(key); + opt_sender_peer = Some(value.to_response_peer()); + } + + peer_map.insert(key, value); + } + + let mut rng = thread_rng(); + + let peers = extract_response_peers( + &mut rng, + &peer_map, + req_num_peers, + opt_sender_key.unwrap_or_else(|| gen_peer_map_key_and_value(1).0), + Peer::to_response_peer, + ); + + // Check that number of returned peers is correct + + let mut success = peers.len() <= req_num_peers; + + if req_num_peers >= gen_num_peers as usize { + success &= peers.len() == gen_num_peers as usize + || peers.len() + 1 == gen_num_peers as usize; + } + + // Check that returned peers are unique (no overlap) and that sender + // isn't returned + + let mut ip_addresses = HashSet::with_capacity(peers.len()); + + for peer in peers { + if peer == opt_sender_peer.clone().unwrap() + || ip_addresses.contains(&peer.ip_address) + { + success = false; + + break; + } + + ip_addresses.insert(peer.ip_address); + } + + TestResult::from_bool(success) + } + + quickcheck(prop as fn((u16, u16)) -> TestResult); + } +} diff --git a/aquatic_udp/src/lib/handlers/connect.rs b/aquatic_udp/src/lib/handlers/connect.rs new file mode 100644 index 0000000..e058241 --- /dev/null +++ b/aquatic_udp/src/lib/handlers/connect.rs @@ -0,0 +1,39 @@ +use std::net::SocketAddr; +use std::vec::Drain; + +use parking_lot::MutexGuard; +use rand::{rngs::StdRng, Rng}; + +use aquatic_udp_protocol::*; + +use crate::common::*; +use crate::config::Config; + +#[inline] +pub fn handle_connect_requests( + config: &Config, + connections: &mut MutexGuard, + rng: &mut StdRng, + requests: Drain<(ConnectRequest, SocketAddr)>, + responses: &mut Vec<(Response, SocketAddr)>, +) { + let valid_until = ValidUntil::new(config.cleaning.max_connection_age); + + responses.extend(requests.map(|(request, src)| { + let connection_id = ConnectionId(rng.gen()); + + let key = ConnectionKey { + connection_id, + socket_addr: src, + }; + + connections.insert(key, valid_until); + + let response = Response::Connect(ConnectResponse { + connection_id, + transaction_id: request.transaction_id, + }); + + (response, src) + })); +} diff --git a/aquatic_udp/src/lib/handlers/mod.rs b/aquatic_udp/src/lib/handlers/mod.rs new file mode 100644 index 0000000..96fa2ba --- /dev/null +++ b/aquatic_udp/src/lib/handlers/mod.rs @@ -0,0 +1,153 @@ +use std::net::SocketAddr; +use std::time::Duration; + +use crossbeam_channel::{Receiver, Sender}; +use parking_lot::MutexGuard; +use rand::{ + rngs::{SmallRng, StdRng}, + SeedableRng, +}; + +use aquatic_udp_protocol::*; + +use crate::common::*; +use crate::config::Config; + +mod announce; +mod connect; +mod scrape; + +use announce::handle_announce_requests; +use connect::handle_connect_requests; +use scrape::handle_scrape_requests; + +pub fn run_request_worker( + state: State, + config: Config, + request_receiver: Receiver<(Request, SocketAddr)>, + response_sender: Sender<(Response, SocketAddr)>, +) { + let mut connect_requests: Vec<(ConnectRequest, SocketAddr)> = Vec::new(); + let mut announce_requests: Vec<(AnnounceRequest, SocketAddr)> = Vec::new(); + let mut scrape_requests: Vec<(ScrapeRequest, SocketAddr)> = Vec::new(); + + let mut responses: Vec<(Response, SocketAddr)> = Vec::new(); + + let mut std_rng = StdRng::from_entropy(); + let mut small_rng = SmallRng::from_rng(&mut std_rng).unwrap(); + + let timeout = Duration::from_micros(config.handlers.channel_recv_timeout_microseconds); + + loop { + let mut opt_connections = None; + + // Collect requests from channel, divide them by type + // + // Collect a maximum number of request. Stop collecting before that + // number is reached if having waited for too long for a request, but + // only if ConnectionMap mutex isn't locked. + for i in 0..config.handlers.max_requests_per_iter { + let (request, src): (Request, SocketAddr) = if i == 0 { + match request_receiver.recv() { + Ok(r) => r, + Err(_) => break, // Really shouldn't happen + } + } else { + match request_receiver.recv_timeout(timeout) { + Ok(r) => r, + Err(_) => { + if let Some(guard) = state.connections.try_lock() { + opt_connections = Some(guard); + + break; + } else { + continue; + } + } + } + }; + + match request { + Request::Connect(r) => connect_requests.push((r, src)), + Request::Announce(r) => announce_requests.push((r, src)), + Request::Scrape(r) => scrape_requests.push((r, src)), + } + } + + let mut connections: MutexGuard = + opt_connections.unwrap_or_else(|| state.connections.lock()); + + handle_connect_requests( + &config, + &mut connections, + &mut std_rng, + connect_requests.drain(..), + &mut responses, + ); + + // Check announce and scrape requests for valid connections + + announce_requests.retain(|(request, src)| { + let connection_key = ConnectionKey { + connection_id: request.connection_id, + socket_addr: *src, + }; + + if connections.contains_key(&connection_key) { + true + } else { + let response = ErrorResponse { + transaction_id: request.transaction_id, + message: "Connection invalid or expired".to_string(), + }; + + responses.push((response.into(), *src)); + + return false; + } + }); + + scrape_requests.retain(|(request, src)| { + let connection_key = ConnectionKey { + connection_id: request.connection_id, + socket_addr: *src, + }; + + if connections.contains_key(&connection_key) { + true + } else { + let response = ErrorResponse { + transaction_id: request.transaction_id, + message: "Connection invalid or expired".to_string(), + }; + + responses.push((response.into(), *src)); + + false + } + }); + + ::std::mem::drop(connections); + + // Generate responses for announce and scrape requests + + if !(announce_requests.is_empty() && scrape_requests.is_empty()) { + let mut torrents = state.torrents.lock(); + + handle_announce_requests( + &config, + &mut torrents, + &mut small_rng, + announce_requests.drain(..), + &mut responses, + ); + handle_scrape_requests(&mut torrents, scrape_requests.drain(..), &mut responses); + } + + for r in responses.drain(..) { + if let Err(err) = response_sender.send(r) { + ::log::error!("error sending response to channel: {}", err); + } + } + } +} diff --git a/aquatic_udp/src/lib/handlers/scrape.rs b/aquatic_udp/src/lib/handlers/scrape.rs new file mode 100644 index 0000000..8198bd8 --- /dev/null +++ b/aquatic_udp/src/lib/handlers/scrape.rs @@ -0,0 +1,64 @@ +use std::net::SocketAddr; +use std::vec::Drain; + +use parking_lot::MutexGuard; + +use aquatic_common::convert_ipv4_mapped_ipv6; +use aquatic_udp_protocol::*; + +use crate::common::*; + +#[inline] +pub fn handle_scrape_requests( + torrents: &mut MutexGuard, + requests: Drain<(ScrapeRequest, SocketAddr)>, + responses: &mut Vec<(Response, SocketAddr)>, +) { + let empty_stats = create_torrent_scrape_statistics(0, 0); + + responses.extend(requests.map(|(request, src)| { + let mut stats: Vec = Vec::with_capacity(request.info_hashes.len()); + + let peer_ip = convert_ipv4_mapped_ipv6(src.ip()); + + if peer_ip.is_ipv4() { + for info_hash in request.info_hashes.iter() { + if let Some(torrent_data) = torrents.ipv4.get(info_hash) { + stats.push(create_torrent_scrape_statistics( + torrent_data.num_seeders as i32, + torrent_data.num_leechers as i32, + )); + } else { + stats.push(empty_stats); + } + } + } else { + for info_hash in request.info_hashes.iter() { + if let Some(torrent_data) = torrents.ipv6.get(info_hash) { + stats.push(create_torrent_scrape_statistics( + torrent_data.num_seeders as i32, + torrent_data.num_leechers as i32, + )); + } else { + stats.push(empty_stats); + } + } + } + + let response = Response::Scrape(ScrapeResponse { + transaction_id: request.transaction_id, + torrent_stats: stats, + }); + + (response, src) + })); +} + +#[inline(always)] +fn create_torrent_scrape_statistics(seeders: i32, leechers: i32) -> TorrentScrapeStatistics { + TorrentScrapeStatistics { + seeders: NumberOfPeers(seeders), + completed: NumberOfDownloads(0), // No implementation planned + leechers: NumberOfPeers(leechers), + } +} diff --git a/aquatic_udp/src/lib/lib.rs b/aquatic_udp/src/lib/lib.rs index 3f099c8..20c70e1 100644 --- a/aquatic_udp/src/lib/lib.rs +++ b/aquatic_udp/src/lib/lib.rs @@ -54,7 +54,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { loop { ::std::thread::sleep(Duration::from_secs(config.cleaning.interval)); - + tasks::clean_connections(&state); let mut torrent_maps = state.torrents.lock(); diff --git a/aquatic_udp/src/lib/tasks.rs b/aquatic_udp/src/lib/tasks.rs index 4f35aec..9b18679 100644 --- a/aquatic_udp/src/lib/tasks.rs +++ b/aquatic_udp/src/lib/tasks.rs @@ -8,14 +8,17 @@ use aquatic_common::access_list::AccessListMode; use crate::common::*; use crate::config::Config; -pub fn update_access_list(config: &Config, torrent_maps: &mut TorrentMaps){ +pub fn update_access_list(config: &Config, torrent_maps: &mut TorrentMaps) { match config.access_list.mode { AccessListMode::Require | AccessListMode::Forbid => { - if let Err(err) = torrent_maps.access_list.update_from_path(&config.access_list.path) { + if let Err(err) = torrent_maps + .access_list + .update_from_path(&config.access_list.path) + { ::log::error!("Update access list from path: {:?}", err); } } - AccessListMode::Ignore => { } + AccessListMode::Ignore => {} } } @@ -28,7 +31,6 @@ pub fn clean_connections(state: &State) { connections.shrink_to_fit(); } - pub fn gather_and_print_statistics(state: &State, config: &Config) { let interval = config.statistics.interval;