diff --git a/Cargo.lock b/Cargo.lock index a40df33..058650b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -21,10 +21,19 @@ version = "1.0.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9a60d744a80c30fcb657dfe2c1b22bcb3e814c1a1e3674f32bf5820b570fbff" +[[package]] +name = "aquatic_common" +version = "0.1.0" +dependencies = [ + "indexmap", + "rand", +] + [[package]] name = "aquatic_udp" version = "0.1.0" dependencies = [ + "aquatic_common", "bittorrent_udp", "cli_helpers", "crossbeam-channel", @@ -82,7 +91,7 @@ dependencies = [ name = "aquatic_ws" version = "0.1.0" dependencies = [ - "aquatic_udp", + "aquatic_common", "bittorrent_udp", "cli_helpers", "flume", diff --git a/Cargo.toml b/Cargo.toml index f6c39dc..6d19ae2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ + "aquatic_common", "aquatic_udp", "aquatic_udp_bench", "aquatic_udp_load_test", diff --git a/aquatic_common/Cargo.toml b/aquatic_common/Cargo.toml new file mode 100644 index 0000000..d5ef54b --- /dev/null +++ b/aquatic_common/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "aquatic_common" +version = "0.1.0" +authors = ["Joakim FrostegÄrd "] +edition = "2018" +license = "Apache-2.0" + +[lib] +name = "aquatic_common" + +[dependencies] +indexmap = "1" +rand = { version = "0.7", features = ["small_rng"] } \ No newline at end of file diff --git a/aquatic_common/src/lib.rs b/aquatic_common/src/lib.rs new file mode 100644 index 0000000..d1ed185 --- /dev/null +++ b/aquatic_common/src/lib.rs @@ -0,0 +1,80 @@ +use std::time::{Duration, Instant}; + +use indexmap::IndexMap; +use rand::Rng; + + +/// Peer or connection valid until this instant +/// +/// Used instead of "last seen" or similar to hopefully prevent arithmetic +/// overflow when cleaning. +#[derive(Debug, Clone, Copy)] +pub struct ValidUntil(pub Instant); + + +impl ValidUntil { + #[inline] + pub fn new(offset_seconds: u64) -> Self { + Self(Instant::now() + Duration::from_secs(offset_seconds)) + } +} + + +/// Extract response peers +/// +/// If there are more peers in map than `max_num_peers_to_take`, do a +/// half-random selection of peers from first and second halves of map, +/// in order to avoid returning too homogeneous peers. +/// +/// Don't care if we send back announcing peer. +#[inline] +pub fn extract_response_peers( + rng: &mut impl Rng, + peer_map: &IndexMap, + max_num_peers_to_take: usize, + peer_conversion_function: F +) -> Vec + where + K: Eq + ::std::hash::Hash, + F: Fn(&V) -> R +{ + let peer_map_len = peer_map.len(); + + if peer_map_len <= max_num_peers_to_take { + peer_map.values() + .map(peer_conversion_function) + .collect() + } else { + let half_num_to_take = max_num_peers_to_take / 2; + let half_peer_map_len = peer_map_len / 2; + + let offset_first_half = rng.gen_range( + 0, + (half_peer_map_len + (peer_map_len % 2)) - half_num_to_take + ); + let offset_second_half = rng.gen_range( + half_peer_map_len, + peer_map_len - half_num_to_take + ); + + let end_first_half = offset_first_half + half_num_to_take; + let end_second_half = offset_second_half + half_num_to_take + (max_num_peers_to_take % 2); + + let mut peers: Vec = Vec::with_capacity(max_num_peers_to_take); + + for i in offset_first_half..end_first_half { + if let Some((_, peer)) = peer_map.get_index(i){ + peers.push(peer_conversion_function(peer)) + } + } + for i in offset_second_half..end_second_half { + if let Some((_, peer)) = peer_map.get_index(i){ + peers.push(peer_conversion_function(peer)) + } + } + + debug_assert_eq!(peers.len(), max_num_peers_to_take); + + peers + } +} \ No newline at end of file diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index a7aaf30..69172d2 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -13,6 +13,7 @@ path = "src/lib/lib.rs" name = "aquatic_udp" [dependencies] +aquatic_common = { path = "../aquatic_common" } bittorrent_udp = { path = "../bittorrent_udp" } cli_helpers = { path = "../cli_helpers" } crossbeam-channel = "0.4" diff --git a/aquatic_udp/src/lib/common.rs b/aquatic_udp/src/lib/common.rs index 086a2de..31d3d7a 100644 --- a/aquatic_udp/src/lib/common.rs +++ b/aquatic_udp/src/lib/common.rs @@ -1,31 +1,17 @@ use std::net::{SocketAddr, IpAddr}; use std::sync::{Arc, atomic::AtomicUsize}; -use std::time::{Duration, Instant}; use hashbrown::HashMap; use indexmap::IndexMap; use parking_lot::Mutex; +pub use aquatic_common::ValidUntil; pub use bittorrent_udp::types::*; pub const MAX_PACKET_SIZE: usize = 4096; -/// Peer or connection valid until this instant -/// -/// Used instead of "last seen" or similar to hopefully prevent arithmetic -/// overflow when cleaning. -#[derive(Debug, Clone, Copy)] -pub struct ValidUntil(pub Instant); - - -impl ValidUntil { - pub fn new(offset_seconds: u64) -> Self { - Self(Instant::now() + Duration::from_secs(offset_seconds)) - } -} - #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct ConnectionKey { diff --git a/aquatic_udp/src/lib/handlers.rs b/aquatic_udp/src/lib/handlers.rs index bc7e779..c1b1ad2 100644 --- a/aquatic_udp/src/lib/handlers.rs +++ b/aquatic_udp/src/lib/handlers.rs @@ -3,10 +3,10 @@ use std::time::Duration; use std::vec::Drain; use crossbeam_channel::{Sender, Receiver}; -use indexmap::IndexMap; use parking_lot::MutexGuard; use rand::{SeedableRng, Rng, rngs::{SmallRng, StdRng}}; +use aquatic_common::extract_response_peers; use bittorrent_udp::types::*; use crate::common::*; @@ -305,67 +305,6 @@ pub fn handle_scrape_requests( } -/// Extract response peers -/// -/// If there are more peers in map than `max_num_peers_to_take`, do a -/// half-random selection of peers from first and second halves of map, -/// in order to avoid returning too homogeneous peers. -/// -/// Don't care if we send back announcing peer. -#[inline] -pub fn extract_response_peers( - rng: &mut impl Rng, - peer_map: &IndexMap, - max_num_peers_to_take: usize, - peer_conversion_function: F -) -> Vec - where - K: Eq + ::std::hash::Hash, - F: Fn(&V) -> R - -{ - let peer_map_len = peer_map.len(); - - if peer_map_len <= max_num_peers_to_take { - peer_map.values() - .map(peer_conversion_function) - .collect() - } else { - let half_num_to_take = max_num_peers_to_take / 2; - let half_peer_map_len = peer_map_len / 2; - - let offset_first_half = rng.gen_range( - 0, - (half_peer_map_len + (peer_map_len % 2)) - half_num_to_take - ); - let offset_second_half = rng.gen_range( - half_peer_map_len, - peer_map_len - half_num_to_take - ); - - let end_first_half = offset_first_half + half_num_to_take; - let end_second_half = offset_second_half + half_num_to_take + (max_num_peers_to_take % 2); - - let mut peers: Vec = Vec::with_capacity(max_num_peers_to_take); - - for i in offset_first_half..end_first_half { - if let Some((_, peer)) = peer_map.get_index(i){ - peers.push(peer_conversion_function(peer)) - } - } - for i in offset_second_half..end_second_half { - if let Some((_, peer)) = peer_map.get_index(i){ - peers.push(peer_conversion_function(peer)) - } - } - - debug_assert_eq!(peers.len(), max_num_peers_to_take); - - peers - } -} - - #[inline] fn calc_max_num_peers_to_take( config: &Config, diff --git a/aquatic_ws/Cargo.toml b/aquatic_ws/Cargo.toml index 27d175b..84fca9a 100644 --- a/aquatic_ws/Cargo.toml +++ b/aquatic_ws/Cargo.toml @@ -14,7 +14,7 @@ name = "aquatic_ws" path = "src/bin/main.rs" [dependencies] -aquatic_udp = { path = "../aquatic_udp" } +aquatic_common = { path = "../aquatic_common" } bittorrent_udp = { path = "../bittorrent_udp" } cli_helpers = { path = "../cli_helpers" } flume = "0.7" @@ -34,4 +34,4 @@ tungstenite = "0.10" [dev-dependencies] quickcheck = "0.9" -quickcheck_macros = "0.9" \ No newline at end of file +quickcheck_macros = "0.9" diff --git a/aquatic_ws/src/lib/common.rs b/aquatic_ws/src/lib/common.rs index 7cd9cd2..69149d2 100644 --- a/aquatic_ws/src/lib/common.rs +++ b/aquatic_ws/src/lib/common.rs @@ -7,7 +7,7 @@ use indexmap::IndexMap; use parking_lot::Mutex; use mio::Token; -pub use aquatic_udp::common::ValidUntil; +pub use aquatic_common::ValidUntil; use crate::protocol::*; diff --git a/aquatic_ws/src/lib/handler.rs b/aquatic_ws/src/lib/handler.rs index fd2bd7f..da00a60 100644 --- a/aquatic_ws/src/lib/handler.rs +++ b/aquatic_ws/src/lib/handler.rs @@ -5,7 +5,7 @@ use hashbrown::HashMap; use parking_lot::MutexGuard; use rand::{Rng, SeedableRng, rngs::SmallRng}; -use aquatic_udp::handlers::extract_response_peers; +use aquatic_common::extract_response_peers; use crate::common::*; use crate::protocol::*;