From 80754ab4ad94b81217c1727e548609539e9aaece Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 18 Oct 2021 23:17:42 +0200 Subject: [PATCH] aquatic_udp: start work on announce handler in glommio version --- aquatic_udp/src/lib/common/announce.rs | 178 ++++++++++++++++++ .../src/lib/{common.rs => common/mod.rs} | 3 + aquatic_udp/src/lib/common/network.rs | 0 aquatic_udp/src/lib/glommio/handlers.rs | 49 +++++ aquatic_udp/src/lib/glommio/mod.rs | 1 + aquatic_udp/src/lib/glommio/network.rs | 21 ++- aquatic_udp/src/lib/mio/handlers/announce.rs | 176 +---------------- 7 files changed, 244 insertions(+), 184 deletions(-) create mode 100644 aquatic_udp/src/lib/common/announce.rs rename aquatic_udp/src/lib/{common.rs => common/mod.rs} (99%) create mode 100644 aquatic_udp/src/lib/common/network.rs create mode 100644 aquatic_udp/src/lib/glommio/handlers.rs diff --git a/aquatic_udp/src/lib/common/announce.rs b/aquatic_udp/src/lib/common/announce.rs new file mode 100644 index 0000000..4a71787 --- /dev/null +++ b/aquatic_udp/src/lib/common/announce.rs @@ -0,0 +1,178 @@ +use rand::rngs::SmallRng; + +use aquatic_common::extract_response_peers; + +use crate::common::*; + +pub fn handle_announce_request( + config: &Config, + rng: &mut SmallRng, + torrents: &mut TorrentMap, + request: AnnounceRequest, + peer_ip: I, + peer_valid_until: ValidUntil, +) -> AnnounceResponse { + let peer_key = PeerMapKey { + ip: peer_ip, + peer_id: request.peer_id, + }; + + let peer_status = PeerStatus::from_event_and_bytes_left(request.event, request.bytes_left); + + let peer = Peer { + ip_address: peer_ip, + port: request.port, + status: peer_status, + valid_until: peer_valid_until, + }; + + let torrent_data = torrents.entry(request.info_hash).or_default(); + + let opt_removed_peer = match peer_status { + PeerStatus::Leeching => { + torrent_data.num_leechers += 1; + + torrent_data.peers.insert(peer_key, peer) + } + PeerStatus::Seeding => { + torrent_data.num_seeders += 1; + + torrent_data.peers.insert(peer_key, peer) + } + PeerStatus::Stopped => torrent_data.peers.remove(&peer_key), + }; + + match opt_removed_peer.map(|peer| peer.status) { + Some(PeerStatus::Leeching) => { + torrent_data.num_leechers -= 1; + } + Some(PeerStatus::Seeding) => { + torrent_data.num_seeders -= 1; + } + _ => {} + } + + let max_num_peers_to_take = calc_max_num_peers_to_take(config, request.peers_wanted.0); + + let response_peers = extract_response_peers( + rng, + &torrent_data.peers, + max_num_peers_to_take, + peer_key, + Peer::to_response_peer, + ); + + AnnounceResponse { + transaction_id: request.transaction_id, + announce_interval: AnnounceInterval(config.protocol.peer_announce_interval), + leechers: NumberOfPeers(torrent_data.num_leechers as i32), + seeders: NumberOfPeers(torrent_data.num_seeders as i32), + peers: response_peers, + } +} + +#[inline] +fn calc_max_num_peers_to_take(config: &Config, peers_wanted: i32) -> usize { + if peers_wanted <= 0 { + config.protocol.max_response_peers as usize + } else { + ::std::cmp::min( + config.protocol.max_response_peers as usize, + peers_wanted as usize, + ) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + use std::net::Ipv4Addr; + + use indexmap::IndexMap; + use quickcheck::{quickcheck, TestResult}; + use rand::thread_rng; + + use super::*; + + fn gen_peer_map_key_and_value(i: u32) -> (PeerMapKey, Peer) { + let ip_address = Ipv4Addr::from(i.to_be_bytes()); + let peer_id = PeerId([0; 20]); + + let key = PeerMapKey { + ip: ip_address, + peer_id, + }; + let value = Peer { + ip_address, + port: Port(1), + status: PeerStatus::Leeching, + valid_until: ValidUntil::new(0), + }; + + (key, value) + } + + #[test] + fn test_extract_response_peers() { + fn prop(data: (u16, u16)) -> TestResult { + let gen_num_peers = data.0 as u32; + let req_num_peers = data.1 as usize; + + let mut peer_map: PeerMap = IndexMap::with_capacity(gen_num_peers as usize); + + let mut opt_sender_key = None; + let mut opt_sender_peer = None; + + for i in 0..gen_num_peers { + let (key, value) = gen_peer_map_key_and_value((i << 16) + i); + + if i == 0 { + opt_sender_key = Some(key); + opt_sender_peer = Some(value.to_response_peer()); + } + + peer_map.insert(key, value); + } + + let mut rng = thread_rng(); + + let peers = extract_response_peers( + &mut rng, + &peer_map, + req_num_peers, + opt_sender_key.unwrap_or_else(|| gen_peer_map_key_and_value(1).0), + Peer::to_response_peer, + ); + + // Check that number of returned peers is correct + + let mut success = peers.len() <= req_num_peers; + + if req_num_peers >= gen_num_peers as usize { + success &= peers.len() == gen_num_peers as usize + || peers.len() + 1 == gen_num_peers as usize; + } + + // Check that returned peers are unique (no overlap) and that sender + // isn't returned + + let mut ip_addresses = HashSet::with_capacity(peers.len()); + + for peer in peers { + if peer == opt_sender_peer.clone().unwrap() + || ip_addresses.contains(&peer.ip_address) + { + success = false; + + break; + } + + ip_addresses.insert(peer.ip_address); + } + + TestResult::from_bool(success) + } + + quickcheck(prop as fn((u16, u16)) -> TestResult); + } +} \ No newline at end of file diff --git a/aquatic_udp/src/lib/common.rs b/aquatic_udp/src/lib/common/mod.rs similarity index 99% rename from aquatic_udp/src/lib/common.rs rename to aquatic_udp/src/lib/common/mod.rs index 5a81e46..5662e1f 100644 --- a/aquatic_udp/src/lib/common.rs +++ b/aquatic_udp/src/lib/common/mod.rs @@ -12,6 +12,9 @@ pub use aquatic_udp_protocol::*; use crate::config::Config; +pub mod announce; +pub mod network; + pub const MAX_PACKET_SIZE: usize = 4096; pub trait Ip: Hash + PartialEq + Eq + Clone + Copy { diff --git a/aquatic_udp/src/lib/common/network.rs b/aquatic_udp/src/lib/common/network.rs new file mode 100644 index 0000000..e69de29 diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs new file mode 100644 index 0000000..771f59b --- /dev/null +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -0,0 +1,49 @@ +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; + +use glommio::prelude::*; +use glommio::channels::shared_channel::{SharedReceiver, SharedSender}; +use rand::SeedableRng; +use rand::prelude::SmallRng; + +use crate::config::Config; +use crate::common::*; +use crate::common::announce::handle_announce_request; + +pub fn run_request_worker( + config: Config, + request_receiver: SharedReceiver<(AnnounceRequest, SocketAddr)>, + response_sender: SharedSender<(AnnounceResponse, SocketAddr)>, +) { + LocalExecutorBuilder::default() + .spawn(|| async move { + let request_receiver = request_receiver.connect().await; + let response_sender = response_sender.connect().await; + + let mut rng = SmallRng::from_entropy(); + + let mut torrents_ipv4 = TorrentMap::::default(); + let mut torrents_ipv6 = TorrentMap::::default(); + + // Needs to be updated periodically + let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); + + while let Some((request, addr)) = request_receiver.recv().await { + let response = match addr.ip() { + IpAddr::V4(ip) => { + handle_announce_request(&config, &mut rng, &mut torrents_ipv4, request, ip, peer_valid_until) + }, + IpAddr::V6(ip) => { + handle_announce_request(&config, &mut rng, &mut torrents_ipv6, request, ip, peer_valid_until) + }, + }; + + if let Err(err) = response_sender.try_send((response, addr)) { + ::log::warn!("response_sender.try_send: {:?}", err); + } + } + + }) + .expect("failed to spawn local executor") + .join() + .unwrap(); +} \ No newline at end of file diff --git a/aquatic_udp/src/lib/glommio/mod.rs b/aquatic_udp/src/lib/glommio/mod.rs index a61610b..030b2ee 100644 --- a/aquatic_udp/src/lib/glommio/mod.rs +++ b/aquatic_udp/src/lib/glommio/mod.rs @@ -1 +1,2 @@ +pub mod handlers; pub mod network; diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs index 2cd9138..1a934e7 100644 --- a/aquatic_udp/src/lib/glommio/network.rs +++ b/aquatic_udp/src/lib/glommio/network.rs @@ -27,8 +27,8 @@ use crate::config::Config; pub fn run_socket_worker( state: State, config: Config, - request_sender: SharedSender<(ConnectedRequest, SocketAddr)>, - response_receiver: SharedReceiver<(ConnectedResponse, SocketAddr)>, + request_sender: SharedSender<(AnnounceRequest, SocketAddr)>, + response_receiver: SharedReceiver<(AnnounceResponse, SocketAddr)>, num_bound_sockets: Arc, ) { LocalExecutorBuilder::default() @@ -65,7 +65,7 @@ pub fn run_socket_worker( async fn read_requests( config: Config, access_list: Arc, - request_sender: SharedSender<(ConnectedRequest, SocketAddr)>, + request_sender: SharedSender<(AnnounceRequest, SocketAddr)>, local_sender: LocalSender<(Response, SocketAddr)>, socket: Rc, ) { @@ -102,7 +102,7 @@ async fn read_requests( if connections.contains(request.connection_id, src) { if access_list.allows(access_list_mode, &request.info_hash.0) { if let Err(err) = request_sender - .try_send((ConnectedRequest::Announce(request), src)) + .try_send((request, src)) { ::log::warn!("request_sender.try_send failed: {:?}", err) } @@ -118,11 +118,12 @@ async fn read_requests( } Ok(Request::Scrape(request)) => { if connections.contains(request.connection_id, src) { - if let Err(err) = - request_sender.try_send((ConnectedRequest::Scrape(request), src)) - { - ::log::warn!("request_sender.try_send failed: {:?}", err) - } + let response = Response::Error(ErrorResponse { + transaction_id: request.transaction_id, + message: "Scrape requests not supported".into(), + }); + + local_sender.try_send((response, src)); } } Err(err) => { @@ -156,7 +157,7 @@ async fn read_requests( } async fn send_responses( - response_receiver: SharedReceiver<(ConnectedResponse, SocketAddr)>, + response_receiver: SharedReceiver<(AnnounceResponse, SocketAddr)>, local_receiver: LocalReceiver<(Response, SocketAddr)>, socket: Rc, ) { diff --git a/aquatic_udp/src/lib/mio/handlers/announce.rs b/aquatic_udp/src/lib/mio/handlers/announce.rs index 913a0d6..43406f5 100644 --- a/aquatic_udp/src/lib/mio/handlers/announce.rs +++ b/aquatic_udp/src/lib/mio/handlers/announce.rs @@ -4,10 +4,11 @@ use std::vec::Drain; use parking_lot::MutexGuard; use rand::rngs::SmallRng; -use aquatic_common::{convert_ipv4_mapped_ipv6, extract_response_peers}; +use aquatic_common::convert_ipv4_mapped_ipv6; use aquatic_udp_protocol::*; use crate::common::*; +use crate::common::announce::handle_announce_request; use crate::config::Config; #[inline] @@ -45,176 +46,3 @@ pub fn handle_announce_requests( (ConnectedResponse::Announce(response), src) })); } - -fn handle_announce_request( - config: &Config, - rng: &mut SmallRng, - torrents: &mut TorrentMap, - request: AnnounceRequest, - peer_ip: I, - peer_valid_until: ValidUntil, -) -> AnnounceResponse { - let peer_key = PeerMapKey { - ip: peer_ip, - peer_id: request.peer_id, - }; - - let peer_status = PeerStatus::from_event_and_bytes_left(request.event, request.bytes_left); - - let peer = Peer { - ip_address: peer_ip, - port: request.port, - status: peer_status, - valid_until: peer_valid_until, - }; - - let torrent_data = torrents.entry(request.info_hash).or_default(); - - let opt_removed_peer = match peer_status { - PeerStatus::Leeching => { - torrent_data.num_leechers += 1; - - torrent_data.peers.insert(peer_key, peer) - } - PeerStatus::Seeding => { - torrent_data.num_seeders += 1; - - torrent_data.peers.insert(peer_key, peer) - } - PeerStatus::Stopped => torrent_data.peers.remove(&peer_key), - }; - - match opt_removed_peer.map(|peer| peer.status) { - Some(PeerStatus::Leeching) => { - torrent_data.num_leechers -= 1; - } - Some(PeerStatus::Seeding) => { - torrent_data.num_seeders -= 1; - } - _ => {} - } - - let max_num_peers_to_take = calc_max_num_peers_to_take(config, request.peers_wanted.0); - - let response_peers = extract_response_peers( - rng, - &torrent_data.peers, - max_num_peers_to_take, - peer_key, - Peer::to_response_peer, - ); - - AnnounceResponse { - transaction_id: request.transaction_id, - announce_interval: AnnounceInterval(config.protocol.peer_announce_interval), - leechers: NumberOfPeers(torrent_data.num_leechers as i32), - seeders: NumberOfPeers(torrent_data.num_seeders as i32), - peers: response_peers, - } -} - -#[inline] -fn calc_max_num_peers_to_take(config: &Config, peers_wanted: i32) -> usize { - if peers_wanted <= 0 { - config.protocol.max_response_peers as usize - } else { - ::std::cmp::min( - config.protocol.max_response_peers as usize, - peers_wanted as usize, - ) - } -} - -#[cfg(test)] -mod tests { - use std::collections::HashSet; - use std::net::Ipv4Addr; - - use indexmap::IndexMap; - use quickcheck::{quickcheck, TestResult}; - use rand::thread_rng; - - use super::*; - - fn gen_peer_map_key_and_value(i: u32) -> (PeerMapKey, Peer) { - let ip_address = Ipv4Addr::from(i.to_be_bytes()); - let peer_id = PeerId([0; 20]); - - let key = PeerMapKey { - ip: ip_address, - peer_id, - }; - let value = Peer { - ip_address, - port: Port(1), - status: PeerStatus::Leeching, - valid_until: ValidUntil::new(0), - }; - - (key, value) - } - - #[test] - fn test_extract_response_peers() { - fn prop(data: (u16, u16)) -> TestResult { - let gen_num_peers = data.0 as u32; - let req_num_peers = data.1 as usize; - - let mut peer_map: PeerMap = IndexMap::with_capacity(gen_num_peers as usize); - - let mut opt_sender_key = None; - let mut opt_sender_peer = None; - - for i in 0..gen_num_peers { - let (key, value) = gen_peer_map_key_and_value((i << 16) + i); - - if i == 0 { - opt_sender_key = Some(key); - opt_sender_peer = Some(value.to_response_peer()); - } - - peer_map.insert(key, value); - } - - let mut rng = thread_rng(); - - let peers = extract_response_peers( - &mut rng, - &peer_map, - req_num_peers, - opt_sender_key.unwrap_or_else(|| gen_peer_map_key_and_value(1).0), - Peer::to_response_peer, - ); - - // Check that number of returned peers is correct - - let mut success = peers.len() <= req_num_peers; - - if req_num_peers >= gen_num_peers as usize { - success &= peers.len() == gen_num_peers as usize - || peers.len() + 1 == gen_num_peers as usize; - } - - // Check that returned peers are unique (no overlap) and that sender - // isn't returned - - let mut ip_addresses = HashSet::with_capacity(peers.len()); - - for peer in peers { - if peer == opt_sender_peer.clone().unwrap() - || ip_addresses.contains(&peer.ip_address) - { - success = false; - - break; - } - - ip_addresses.insert(peer.ip_address); - } - - TestResult::from_bool(success) - } - - quickcheck(prop as fn((u16, u16)) -> TestResult); - } -}