From e3ce111548ac9b124317fb02f497e59287f6d6f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 20 Feb 2022 00:23:53 +0100 Subject: [PATCH 1/8] Reorganize and clean up TODO, add some entries --- TODO.md | 112 ++++++++++++++++++++++++-------------------------------- 1 file changed, 48 insertions(+), 64 deletions(-) diff --git a/TODO.md b/TODO.md index dc7f13c..884dfec 100644 --- a/TODO.md +++ b/TODO.md @@ -1,22 +1,39 @@ # TODO -* readme - * document privilege dropping and cpu pinning +## High priority + +## Medium priority + +* newer glommio versions might use SIGUSR1 internally, see glommio fe33e30 +* quit whole program if any thread panics +* implement socket_recv_size and ipv6_only in glommio implementations +* config: fail on unrecognized keys? +* Run cargo-deny in CI + +* aquatic_http: + * clean out connections regularly + * handle like in aquatic_ws + * Rc> which get set on successful request parsing and + successful response sending. Clone kept in connection slab which gets cleaned + periodically (= cancel tasks). Means that task handle will need to be stored in slab. + Config vars kill_idle_connections: bool, max_idle_connection_time. Remove keepalive. + * handle panicked/cancelled tasks? + +* aquatic_ws + * remove mio implementation when glommio issues fixed + * glommio + * RES memory still high after traffic stops, even if torrent maps and connection slabs go down to 0 len and capacity + * replacing indexmap_amortized / simd_json with equivalents doesn't help + * SinkExt::send maybe doesn't wake up properly? + * related to https://github.com/sdroege/async-tungstenite/blob/master/src/compat.rs#L18 ? + +* extract_response_peers + * don't assume requesting peer is in list? + +## Low priority * config * add flag to print parsed config when starting - * fail on unrecognized keys - -* quit whole program if any thread panics - -* implement socket_recv_size and ipv6_only in glommio implementations - -* newer glommio versions might use SIGUSR1 internally, see glommio fe33e30 - -* CI - * file transfer CI for all implementations - * test access lists? - * cargo-deny * aquatic_udp * look at proper cpu pinning (check that one thread gets bound per core) @@ -27,19 +44,27 @@ * move additional request sending to for each received response, maybe with probability 0.2 +* aquatic_ws + * glommio + * proper cpu set pinning + * general + * large amount of temporary allocations in serialize_20_bytes, pretty many in deserialize_20_bytes + +* extract response peers: extract "one extra" to compensate for removal, + of sender if present in selection? + +# Not important + * aquatic_http: - * clean out connections regularly - * Rc> which get set on successful request parsing and - successful response sending. Clone kept in connection slab which gets cleaned - periodically (= cancel tasks). Means that task handle will need to be stored in slab. - Config vars kill_idle_connections: bool, max_idle_connection_time. Remove keepalive. - * handle panicked/cancelled tasks? * optimize? * get_peer_addr only once (takes 1.2% of runtime) * queue response: allocating takes 2.8% of runtime - * use futures-rustls for load test * consider better error type for request parsing, so that better error messages can be sent back (e.g., "full scrapes are not supported") + * test torrent transfer with real clients + * scrape: does it work (serialization etc), and with multiple hashes? + * 'left' optional in magnet requests? Probably not. Transmission sends huge + positive number. * aquatic_ws * mio @@ -51,49 +76,8 @@ * deregistering before closing is required by mio, but it hurts performance * blocked on https://github.com/snapview/tungstenite-rs/issues/51 * connection closing: send tls close message etc? - * glommio - * proper cpu set pinning - * RES memory still high after traffic stops, even if torrent maps and connection slabs go down to 0 len and capacity - * replacing indexmap_amortized / simd_json with equivalents doesn't help - * SinkExt::send maybe doesn't wake up properly? - * related to https://github.com/sdroege/async-tungstenite/blob/master/src/compat.rs#L18 ? - * general - * large amount of temporary allocations in serialize_20_bytes, pretty many in deserialize_20_bytes - -* extract_response_peers - * don't assume requesting peer is in list - -# Less important - -* extract response peers: extract "one extra" to compensate for removal, - of sender if present in selection? maybe make criterion benchmark, - optimize - -## aquatic_http_load_test -* how handle large number of peers for "popular" torrents in keepalive mode? - maybe it is ok - -## aquatic_http -* test torrent transfer with real clients - * scrape: does it work (serialization etc), and with multiple hashes? - * 'left' optional in magnet requests? Probably not. Transmission sends huge - positive number. -* compact=0 should result in error response - -## aquatic_ws_load_test -* very small amount of connections means very small number of peers per - torrents, so tracker handling of large number is not really assessed - -# Not important - -## aquatic_ws -* write new version of extract_response_peers which checks for equality with - peer sending request? It could return an arrayvec or smallvec by the way - (but then the size needs to be adjusted together with the corresponding - config var, or the config var needs to be removed) - -## aquatic_cli_helpers -* Include config field comments in exported toml (likely quite a bit of work) + * write new version of extract_response_peers which checks for equality with + peer sending request??? # Don't do From f18fc52675e13ec803b230e66be9e135e150fd7e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 20 Feb 2022 00:45:04 +0100 Subject: [PATCH 2/8] Run cargo fmt --- aquatic_cli_helpers/src/lib.rs | 2 +- aquatic_common/src/access_list.rs | 2 +- aquatic_common/src/cpu_pinning.rs | 2 +- aquatic_common/src/lib.rs | 18 ++++++++---------- aquatic_common/src/privileges.rs | 2 +- aquatic_http/src/config.rs | 2 +- aquatic_http_load_test/src/config.rs | 2 +- aquatic_toml_config/src/lib.rs | 2 +- aquatic_udp_bench/src/config.rs | 2 +- aquatic_ws_load_test/src/config.rs | 2 +- 10 files changed, 17 insertions(+), 19 deletions(-) diff --git a/aquatic_cli_helpers/src/lib.rs b/aquatic_cli_helpers/src/lib.rs index 8407361..a6598c1 100644 --- a/aquatic_cli_helpers/src/lib.rs +++ b/aquatic_cli_helpers/src/lib.rs @@ -2,9 +2,9 @@ use std::fs::File; use std::io::Read; use anyhow::Context; +use aquatic_toml_config::TomlConfig; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use simplelog::{ColorChoice, ConfigBuilder, LevelFilter, TermLogger, TerminalMode}; -use aquatic_toml_config::TomlConfig; /// Log level. Available values are off, error, warn, info, debug and trace. #[derive(Debug, Clone, Copy, PartialEq, TomlConfig, Serialize, Deserialize)] diff --git a/aquatic_common/src/access_list.rs b/aquatic_common/src/access_list.rs index 3b123cf..2eebc4b 100644 --- a/aquatic_common/src/access_list.rs +++ b/aquatic_common/src/access_list.rs @@ -4,10 +4,10 @@ use std::path::PathBuf; use std::sync::Arc; use anyhow::Context; +use aquatic_toml_config::TomlConfig; use arc_swap::{ArcSwap, Cache}; use hashbrown::HashSet; use serde::{Deserialize, Serialize}; -use aquatic_toml_config::TomlConfig; /// Access list mode. Available modes are white, black and off. #[derive(Clone, Copy, Debug, PartialEq, TomlConfig, Serialize, Deserialize)] diff --git a/aquatic_common/src/cpu_pinning.rs b/aquatic_common/src/cpu_pinning.rs index 8668433..6f0a9d9 100644 --- a/aquatic_common/src/cpu_pinning.rs +++ b/aquatic_common/src/cpu_pinning.rs @@ -1,6 +1,6 @@ +use aquatic_toml_config::TomlConfig; use hwloc::{CpuSet, ObjectType, Topology, CPUBIND_THREAD}; use serde::{Deserialize, Serialize}; -use aquatic_toml_config::TomlConfig; #[derive(Clone, Debug, PartialEq, TomlConfig, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] diff --git a/aquatic_common/src/lib.rs b/aquatic_common/src/lib.rs index a1c5307..17c9f24 100644 --- a/aquatic_common/src/lib.rs +++ b/aquatic_common/src/lib.rs @@ -52,16 +52,14 @@ where if peer_map_len <= max_num_peers_to_take + 1 { let mut peers = Vec::with_capacity(peer_map_len); - peers.extend(peer_map - .iter() - .filter_map(|(k, v)| { - if *k == sender_peer_map_key { - None - } else { - Some(peer_conversion_function(v)) - } - })); - + peers.extend(peer_map.iter().filter_map(|(k, v)| { + if *k == sender_peer_map_key { + None + } else { + Some(peer_conversion_function(v)) + } + })); + peers } else { let half_num_to_take = max_num_peers_to_take / 2; diff --git a/aquatic_common/src/privileges.rs b/aquatic_common/src/privileges.rs index 3910c3d..0e4a627 100644 --- a/aquatic_common/src/privileges.rs +++ b/aquatic_common/src/privileges.rs @@ -6,9 +6,9 @@ use std::{ time::Duration, }; +use aquatic_toml_config::TomlConfig; use privdrop::PrivDrop; use serde::Deserialize; -use aquatic_toml_config::TomlConfig; #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] #[serde(default)] diff --git a/aquatic_http/src/config.rs b/aquatic_http/src/config.rs index 66795b7..095acce 100644 --- a/aquatic_http/src/config.rs +++ b/aquatic_http/src/config.rs @@ -1,8 +1,8 @@ use std::{net::SocketAddr, path::PathBuf}; use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig}; -use serde::Deserialize; use aquatic_toml_config::TomlConfig; +use serde::Deserialize; use aquatic_cli_helpers::LogLevel; diff --git a/aquatic_http_load_test/src/config.rs b/aquatic_http_load_test/src/config.rs index 4a4721f..6f88e79 100644 --- a/aquatic_http_load_test/src/config.rs +++ b/aquatic_http_load_test/src/config.rs @@ -1,8 +1,8 @@ use std::net::SocketAddr; use aquatic_cli_helpers::LogLevel; -use serde::Deserialize; use aquatic_toml_config::TomlConfig; +use serde::Deserialize; /// aquatic_http_load_test configuration #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] diff --git a/aquatic_toml_config/src/lib.rs b/aquatic_toml_config/src/lib.rs index eb3b9a8..42745d3 100644 --- a/aquatic_toml_config/src/lib.rs +++ b/aquatic_toml_config/src/lib.rs @@ -1,5 +1,5 @@ -pub use toml; pub use aquatic_toml_config_derive::TomlConfig; +pub use toml; /// Run this on your struct implementing TomlConfig to generate a /// serialization/deserialization test for it. diff --git a/aquatic_udp_bench/src/config.rs b/aquatic_udp_bench/src/config.rs index 89480d5..2b4b2f8 100644 --- a/aquatic_udp_bench/src/config.rs +++ b/aquatic_udp_bench/src/config.rs @@ -1,5 +1,5 @@ -use serde::Deserialize; use aquatic_toml_config::TomlConfig; +use serde::Deserialize; #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] pub struct BenchConfig { diff --git a/aquatic_ws_load_test/src/config.rs b/aquatic_ws_load_test/src/config.rs index 334c61f..258423b 100644 --- a/aquatic_ws_load_test/src/config.rs +++ b/aquatic_ws_load_test/src/config.rs @@ -3,8 +3,8 @@ use std::net::SocketAddr; use aquatic_cli_helpers::LogLevel; #[cfg(feature = "cpu-pinning")] use aquatic_common::cpu_pinning::CpuPinningConfig; -use serde::Deserialize; use aquatic_toml_config::TomlConfig; +use serde::Deserialize; /// aquatic_ws_load_test configuration #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] From 8e7f8425f9882ef0cc35ef518c0fc40828647f66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 20 Feb 2022 00:48:58 +0100 Subject: [PATCH 3/8] udp: default to 120s max_connection_age, since it is in BEP0015 --- aquatic_udp/src/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aquatic_udp/src/config.rs b/aquatic_udp/src/config.rs index 42297c9..e90e2e7 100644 --- a/aquatic_udp/src/config.rs +++ b/aquatic_udp/src/config.rs @@ -188,7 +188,7 @@ impl Default for CleaningConfig { connection_cleaning_interval: 60, torrent_cleaning_interval: 60 * 2, pending_scrape_cleaning_interval: 60 * 10, - max_connection_age: 60 * 5, + max_connection_age: 60 * 2, max_peer_age: 60 * 20, max_pending_scrape_age: 60, } From 903010dbe14c0b8b066b7081cf3cfb3bf44a25b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 20 Feb 2022 00:59:07 +0100 Subject: [PATCH 4/8] udp: use action 1 for IPv6 announce responses; refactor protocol code According to BEP015, action 1 is to be used. --- README.md | 1 - aquatic_udp/src/common.rs | 26 +--- aquatic_udp/src/workers/request.rs | 91 +++---------- aquatic_udp_load_test/src/worker/mod.rs | 2 +- aquatic_udp_protocol/src/common.rs | 30 ++--- aquatic_udp_protocol/src/response.rs | 162 ++++++++++-------------- 6 files changed, 99 insertions(+), 213 deletions(-) diff --git a/README.md b/README.md index a9734b8..346fa84 100644 --- a/README.md +++ b/README.md @@ -120,7 +120,6 @@ Implements: * Doesn't care about IP addresses sent in announce requests. The packet source IP is always used. * Doesn't track of the number of torrent downloads (0 is always sent). - * [IPv6 support](https://web.archive.org/web/20170503181830/http://opentracker.blog.h3q.com/2007/12/28/the-ipv6-situation/) IPv4 and IPv6 peers are tracked separately. diff --git a/aquatic_udp/src/common.rs b/aquatic_udp/src/common.rs index 693f31d..36affe7 100644 --- a/aquatic_udp/src/common.rs +++ b/aquatic_udp/src/common.rs @@ -1,6 +1,6 @@ use std::collections::BTreeMap; use std::hash::Hash; -use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; +use std::net::{Ipv4Addr, Ipv6Addr}; use std::sync::atomic::AtomicUsize; use std::sync::Arc; @@ -14,22 +14,6 @@ use crate::config::Config; pub const MAX_PACKET_SIZE: usize = 8192; -pub trait Ip: Hash + PartialEq + Eq + Clone + Copy { - fn ip_addr(self) -> IpAddr; -} - -impl Ip for Ipv4Addr { - fn ip_addr(self) -> IpAddr { - IpAddr::V4(self) - } -} - -impl Ip for Ipv6Addr { - fn ip_addr(self) -> IpAddr { - IpAddr::V6(self) - } -} - #[derive(Debug)] pub struct PendingScrapeRequest { pub slab_key: usize, @@ -50,8 +34,8 @@ pub enum ConnectedRequest { #[derive(Debug)] pub enum ConnectedResponse { - AnnounceIpv4(AnnounceResponseIpv4), - AnnounceIpv6(AnnounceResponseIpv6), + AnnounceIpv4(AnnounceResponse), + AnnounceIpv6(AnnounceResponse), Scrape(PendingScrapeResponse), } @@ -234,14 +218,14 @@ mod tests { let config = Config::default(); - let peers = ::std::iter::repeat(ResponsePeerIpv6 { + let peers = ::std::iter::repeat(ResponsePeer { ip_address: Ipv6Addr::new(1, 1, 1, 1, 1, 1, 1, 1), port: Port(1), }) .take(config.protocol.max_response_peers) .collect(); - let response = Response::AnnounceIpv6(AnnounceResponseIpv6 { + let response = Response::AnnounceIpv6(AnnounceResponse { transaction_id: TransactionId(1), announce_interval: AnnounceInterval(1), seeders: NumberOfPeers(1), diff --git a/aquatic_udp/src/workers/request.rs b/aquatic_udp/src/workers/request.rs index 590f3e3..4c013eb 100644 --- a/aquatic_udp/src/workers/request.rs +++ b/aquatic_udp/src/workers/request.rs @@ -30,6 +30,15 @@ struct Peer { pub valid_until: ValidUntil, } +impl Peer { + fn to_response_peer(&self) -> ResponsePeer { + ResponsePeer { + ip_address: self.ip_address, + port: self.port, + } + } +} + type PeerMap = AHashIndexMap>; struct TorrentData { @@ -111,68 +120,6 @@ impl TorrentMaps { } } -#[derive(Clone, PartialEq, Debug)] -pub struct ProtocolResponsePeer { - pub ip_address: I, - pub port: Port, -} - -impl ProtocolResponsePeer { - #[inline(always)] - fn from_peer(peer: &Peer) -> Self { - Self { - ip_address: peer.ip_address, - port: peer.port, - } - } -} - -pub struct ProtocolAnnounceResponse { - pub transaction_id: TransactionId, - pub announce_interval: AnnounceInterval, - pub leechers: NumberOfPeers, - pub seeders: NumberOfPeers, - pub peers: Vec>, -} - -impl Into for ProtocolAnnounceResponse { - fn into(self) -> ConnectedResponse { - ConnectedResponse::AnnounceIpv4(AnnounceResponseIpv4 { - transaction_id: self.transaction_id, - announce_interval: self.announce_interval, - leechers: self.leechers, - seeders: self.seeders, - peers: self - .peers - .into_iter() - .map(|peer| ResponsePeerIpv4 { - ip_address: peer.ip_address, - port: peer.port, - }) - .collect(), - }) - } -} - -impl Into for ProtocolAnnounceResponse { - fn into(self) -> ConnectedResponse { - ConnectedResponse::AnnounceIpv6(AnnounceResponseIpv6 { - transaction_id: self.transaction_id, - announce_interval: self.announce_interval, - leechers: self.leechers, - seeders: self.seeders, - peers: self - .peers - .into_iter() - .map(|peer| ResponsePeerIpv6 { - ip_address: peer.ip_address, - port: peer.port, - }) - .collect(), - }) - } -} - pub fn run_request_worker( config: Config, state: State, @@ -258,24 +205,22 @@ fn handle_announce_request( peer_valid_until: ValidUntil, ) -> ConnectedResponse { match src.get().ip() { - IpAddr::V4(ip) => handle_announce_request_inner( + IpAddr::V4(ip) => ConnectedResponse::AnnounceIpv4(handle_announce_request_inner( config, rng, &mut torrents.ipv4, request, ip, peer_valid_until, - ) - .into(), - IpAddr::V6(ip) => handle_announce_request_inner( + )), + IpAddr::V6(ip) => ConnectedResponse::AnnounceIpv6(handle_announce_request_inner( config, rng, &mut torrents.ipv6, request, ip, peer_valid_until, - ) - .into(), + )), } } @@ -286,7 +231,7 @@ fn handle_announce_request_inner( request: AnnounceRequest, peer_ip: I, peer_valid_until: ValidUntil, -) -> ProtocolAnnounceResponse { +) -> AnnounceResponse { let peer_status = PeerStatus::from_event_and_bytes_left(request.event, request.bytes_left); let peer = Peer { @@ -329,10 +274,10 @@ fn handle_announce_request_inner( &torrent_data.peers, max_num_peers_to_take, request.peer_id, - ProtocolResponsePeer::from_peer, + Peer::to_response_peer, ); - ProtocolAnnounceResponse { + AnnounceResponse { transaction_id: request.transaction_id, announce_interval: AnnounceInterval(config.protocol.peer_announce_interval), leechers: NumberOfPeers(torrent_data.num_leechers as i32), @@ -448,7 +393,7 @@ mod tests { if i == 0 { opt_sender_key = Some(key); - opt_sender_peer = Some(ProtocolResponsePeer::from_peer(&peer)); + opt_sender_peer = Some(peer.to_response_peer()); } peer_map.insert(key, peer); @@ -461,7 +406,7 @@ mod tests { &peer_map, req_num_peers, opt_sender_key.unwrap_or_else(|| gen_peer_id(1)), - ProtocolResponsePeer::from_peer, + Peer::to_response_peer, ); // Check that number of returned peers is correct diff --git a/aquatic_udp_load_test/src/worker/mod.rs b/aquatic_udp_load_test/src/worker/mod.rs index 816dafc..40b0a62 100644 --- a/aquatic_udp_load_test/src/worker/mod.rs +++ b/aquatic_udp_load_test/src/worker/mod.rs @@ -56,7 +56,7 @@ pub fn run_worker_thread( for event in events.iter() { if (event.token() == token) & event.is_readable() { while let Ok(amt) = socket.recv(&mut buffer) { - match Response::from_bytes(&buffer[0..amt]) { + match Response::from_bytes(&buffer[0..amt], addr.is_ipv4()) { Ok(response) => { match response { Response::AnnounceIpv4(ref r) => { diff --git a/aquatic_udp_protocol/src/common.rs b/aquatic_udp_protocol/src/common.rs index 77192c6..8aab39f 100644 --- a/aquatic_udp_protocol/src/common.rs +++ b/aquatic_udp_protocol/src/common.rs @@ -1,5 +1,11 @@ +use std::fmt::Debug; use std::net::{Ipv4Addr, Ipv6Addr}; +pub trait Ip: Clone + Copy + Debug + PartialEq + Eq {} + +impl Ip for Ipv4Addr {} +impl Ip for Ipv6Addr {} + #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] pub struct AnnounceInterval(pub i32); @@ -30,15 +36,9 @@ pub struct PeerId(pub [u8; 20]); #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] pub struct PeerKey(pub u32); -#[derive(Hash, PartialEq, Eq, Clone, Debug)] -pub struct ResponsePeerIpv4 { - pub ip_address: Ipv4Addr, - pub port: Port, -} - -#[derive(Hash, PartialEq, Eq, Clone, Debug)] -pub struct ResponsePeerIpv6 { - pub ip_address: Ipv6Addr, +#[derive(PartialEq, Eq, Clone, Debug)] +pub struct ResponsePeer { + pub ip_address: I, pub port: Port, } @@ -69,17 +69,7 @@ impl quickcheck::Arbitrary for PeerId { } #[cfg(test)] -impl quickcheck::Arbitrary for ResponsePeerIpv4 { - fn arbitrary(g: &mut quickcheck::Gen) -> Self { - Self { - ip_address: quickcheck::Arbitrary::arbitrary(g), - port: Port(u16::arbitrary(g).into()), - } - } -} - -#[cfg(test)] -impl quickcheck::Arbitrary for ResponsePeerIpv6 { +impl quickcheck::Arbitrary for ResponsePeer { fn arbitrary(g: &mut quickcheck::Gen) -> Self { Self { ip_address: quickcheck::Arbitrary::arbitrary(g), diff --git a/aquatic_udp_protocol/src/response.rs b/aquatic_udp_protocol/src/response.rs index 8e9a280..634724f 100644 --- a/aquatic_udp_protocol/src/response.rs +++ b/aquatic_udp_protocol/src/response.rs @@ -21,21 +21,12 @@ pub struct ConnectResponse { } #[derive(PartialEq, Eq, Clone, Debug)] -pub struct AnnounceResponseIpv4 { +pub struct AnnounceResponse { pub transaction_id: TransactionId, pub announce_interval: AnnounceInterval, pub leechers: NumberOfPeers, pub seeders: NumberOfPeers, - pub peers: Vec, -} - -#[derive(PartialEq, Eq, Clone, Debug)] -pub struct AnnounceResponseIpv6 { - pub transaction_id: TransactionId, - pub announce_interval: AnnounceInterval, - pub leechers: NumberOfPeers, - pub seeders: NumberOfPeers, - pub peers: Vec, + pub peers: Vec>, } #[derive(PartialEq, Eq, Clone, Debug)] @@ -53,8 +44,8 @@ pub struct ErrorResponse { #[derive(PartialEq, Eq, Clone, Debug)] pub enum Response { Connect(ConnectResponse), - AnnounceIpv4(AnnounceResponseIpv4), - AnnounceIpv6(AnnounceResponseIpv6), + AnnounceIpv4(AnnounceResponse), + AnnounceIpv6(AnnounceResponse), Scrape(ScrapeResponse), Error(ErrorResponse), } @@ -65,14 +56,14 @@ impl From for Response { } } -impl From for Response { - fn from(r: AnnounceResponseIpv4) -> Self { +impl From> for Response { + fn from(r: AnnounceResponse) -> Self { Self::AnnounceIpv4(r) } } -impl From for Response { - fn from(r: AnnounceResponseIpv6) -> Self { +impl From> for Response { + fn from(r: AnnounceResponse) -> Self { Self::AnnounceIpv6(r) } } @@ -90,12 +81,6 @@ impl From for Response { } impl Response { - /// Returning IPv6 peers doesn't really work with UDP. It is not supported - /// by https://libtorrent.org/udp_tracker_protocol.html. There is a - /// suggestion in https://web.archive.org/web/20170503181830/http://opentracker.blog.h3q.com/2007/12/28/the-ipv6-situation/ - /// of using action number 4 and returning IPv6 octets just like for IPv4 - /// addresses. Clients seem not to support it very well, but due to a lack - /// of alternative solutions, it is implemented here. #[inline] pub fn write(&self, bytes: &mut impl Write) -> Result<(), io::Error> { match self { @@ -116,6 +101,18 @@ impl Response { bytes.write_u16::(peer.port.0)?; } } + Response::AnnounceIpv6(r) => { + bytes.write_i32::(1)?; + bytes.write_i32::(r.transaction_id.0)?; + bytes.write_i32::(r.announce_interval.0)?; + bytes.write_i32::(r.leechers.0)?; + bytes.write_i32::(r.seeders.0)?; + + for peer in r.peers.iter() { + bytes.write_all(&peer.ip_address.octets())?; + bytes.write_u16::(peer.port.0)?; + } + } Response::Scrape(r) => { bytes.write_i32::(2)?; bytes.write_i32::(r.transaction_id.0)?; @@ -132,25 +129,13 @@ impl Response { bytes.write_all(r.message.as_bytes())?; } - Response::AnnounceIpv6(r) => { - bytes.write_i32::(4)?; - bytes.write_i32::(r.transaction_id.0)?; - bytes.write_i32::(r.announce_interval.0)?; - bytes.write_i32::(r.leechers.0)?; - bytes.write_i32::(r.seeders.0)?; - - for peer in r.peers.iter() { - bytes.write_all(&peer.ip_address.octets())?; - bytes.write_u16::(peer.port.0)?; - } - } } Ok(()) } #[inline] - pub fn from_bytes(bytes: &[u8]) -> Result { + pub fn from_bytes(bytes: &[u8], ipv4: bool) -> Result { let mut cursor = Cursor::new(bytes); let action = cursor.read_i32::()?; @@ -168,7 +153,7 @@ impl Response { .into()) } // Announce - 1 => { + 1 if ipv4 => { let announce_interval = cursor.read_i32::()?; let leechers = cursor.read_i32::()?; let seeders = cursor.read_i32::()?; @@ -183,14 +168,45 @@ impl Response { let ip_address = Ipv4Addr::from(ip_bytes); let port = (&chunk[4..]).read_u16::().unwrap(); - ResponsePeerIpv4 { + ResponsePeer { ip_address, port: Port(port), } }) .collect(); - Ok((AnnounceResponseIpv4 { + Ok((AnnounceResponse { + transaction_id: TransactionId(transaction_id), + announce_interval: AnnounceInterval(announce_interval), + leechers: NumberOfPeers(leechers), + seeders: NumberOfPeers(seeders), + peers, + }) + .into()) + } + 1 if !ipv4 => { + let announce_interval = cursor.read_i32::()?; + let leechers = cursor.read_i32::()?; + let seeders = cursor.read_i32::()?; + + let position = cursor.position() as usize; + let inner = cursor.into_inner(); + + let peers = inner[position..] + .chunks_exact(18) + .map(|chunk| { + let ip_bytes: [u8; 16] = (&chunk[..16]).try_into().unwrap(); + let ip_address = Ipv6Addr::from(ip_bytes); + let port = (&chunk[16..]).read_u16::().unwrap(); + + ResponsePeer { + ip_address, + port: Port(port), + } + }) + .collect(); + + Ok((AnnounceResponse { transaction_id: TransactionId(transaction_id), announce_interval: AnnounceInterval(announce_interval), leechers: NumberOfPeers(leechers), @@ -240,38 +256,6 @@ impl Response { }) .into()) } - // IPv6 announce - 4 => { - let announce_interval = cursor.read_i32::()?; - let leechers = cursor.read_i32::()?; - let seeders = cursor.read_i32::()?; - - let position = cursor.position() as usize; - let inner = cursor.into_inner(); - - let peers = inner[position..] - .chunks_exact(18) - .map(|chunk| { - let ip_bytes: [u8; 16] = (&chunk[..16]).try_into().unwrap(); - let ip_address = Ipv6Addr::from(ip_bytes); - let port = (&chunk[16..]).read_u16::().unwrap(); - - ResponsePeerIpv6 { - ip_address, - port: Port(port), - } - }) - .collect(); - - Ok((AnnounceResponseIpv6 { - transaction_id: TransactionId(transaction_id), - announce_interval: AnnounceInterval(announce_interval), - leechers: NumberOfPeers(leechers), - seeders: NumberOfPeers(seeders), - peers, - }) - .into()) - } _ => Ok((ErrorResponse { transaction_id: TransactionId(transaction_id), message: "Invalid action".into(), @@ -306,26 +290,10 @@ mod tests { } } - impl quickcheck::Arbitrary for AnnounceResponseIpv4 { + impl quickcheck::Arbitrary for AnnounceResponse { fn arbitrary(g: &mut quickcheck::Gen) -> Self { let peers = (0..u8::arbitrary(g)) - .map(|_| ResponsePeerIpv4::arbitrary(g)) - .collect(); - - Self { - transaction_id: TransactionId(i32::arbitrary(g)), - announce_interval: AnnounceInterval(i32::arbitrary(g)), - leechers: NumberOfPeers(i32::arbitrary(g)), - seeders: NumberOfPeers(i32::arbitrary(g)), - peers, - } - } - } - - impl quickcheck::Arbitrary for AnnounceResponseIpv6 { - fn arbitrary(g: &mut quickcheck::Gen) -> Self { - let peers = (0..u8::arbitrary(g)) - .map(|_| ResponsePeerIpv6::arbitrary(g)) + .map(|_| ResponsePeer::arbitrary(g)) .collect(); Self { @@ -351,11 +319,11 @@ mod tests { } } - fn same_after_conversion(response: Response) -> bool { + fn same_after_conversion(response: Response, ipv4: bool) -> bool { let mut buf = Vec::new(); response.clone().write(&mut buf).unwrap(); - let r2 = Response::from_bytes(&buf[..]).unwrap(); + let r2 = Response::from_bytes(&buf[..], ipv4).unwrap(); let success = response == r2; @@ -368,21 +336,21 @@ mod tests { #[quickcheck] fn test_connect_response_convert_identity(response: ConnectResponse) -> bool { - same_after_conversion(response.into()) + same_after_conversion(response.into(), true) } #[quickcheck] - fn test_announce_response_ipv4_convert_identity(response: AnnounceResponseIpv4) -> bool { - same_after_conversion(response.into()) + fn test_announce_response_ipv4_convert_identity(response: AnnounceResponse) -> bool { + same_after_conversion(response.into(), true) } #[quickcheck] - fn test_announce_response_ipv6_convert_identity(response: AnnounceResponseIpv6) -> bool { - same_after_conversion(response.into()) + fn test_announce_response_ipv6_convert_identity(response: AnnounceResponse) -> bool { + same_after_conversion(response.into(), false) } #[quickcheck] fn test_scrape_response_convert_identity(response: ScrapeResponse) -> bool { - same_after_conversion(response.into()) + same_after_conversion(response.into(), true) } } From 3785e57513c85226b774e7e3558c989bcd640c55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 20 Feb 2022 01:44:34 +0100 Subject: [PATCH 5/8] Sort dependencies in Cargo.toml files --- aquatic_cli_helpers/Cargo.toml | 3 ++- aquatic_common/Cargo.toml | 3 ++- aquatic_http/Cargo.toml | 5 +++-- aquatic_http_load_test/Cargo.toml | 5 +++-- aquatic_udp/Cargo.toml | 9 +++++---- aquatic_udp_bench/Cargo.toml | 7 ++++--- aquatic_udp_load_test/Cargo.toml | 7 ++++--- aquatic_ws/Cargo.toml | 5 +++-- aquatic_ws_load_test/Cargo.toml | 5 +++-- 9 files changed, 29 insertions(+), 20 deletions(-) diff --git a/aquatic_cli_helpers/Cargo.toml b/aquatic_cli_helpers/Cargo.toml index 4b77eca..853b635 100644 --- a/aquatic_cli_helpers/Cargo.toml +++ b/aquatic_cli_helpers/Cargo.toml @@ -8,8 +8,9 @@ description = "aquatic BitTorrent tracker CLI helpers" repository = "https://github.com/greatest-ape/aquatic" [dependencies] +aquatic_toml_config = "0.1.0" + anyhow = "1" serde = { version = "1", features = ["derive"] } simplelog = "0.11" toml = "0.5" -aquatic_toml_config = "0.1.0" diff --git a/aquatic_common/Cargo.toml b/aquatic_common/Cargo.toml index 446666e..9e81393 100644 --- a/aquatic_common/Cargo.toml +++ b/aquatic_common/Cargo.toml @@ -14,6 +14,8 @@ name = "aquatic_common" cpu-pinning = ["hwloc", "libc"] [dependencies] +aquatic_toml_config = "0.1.0" + ahash = "0.7" anyhow = "1" arc-swap = "1" @@ -24,7 +26,6 @@ log = "0.4" privdrop = "0.5" rand = { version = "0.8", features = ["small_rng"] } serde = { version = "1", features = ["derive"] } -aquatic_toml_config = "0.1.0" # cpu-pinning hwloc = { version = "0.5", optional = true } diff --git a/aquatic_http/Cargo.toml b/aquatic_http/Cargo.toml index 00e9a9f..9bdae8c 100644 --- a/aquatic_http/Cargo.toml +++ b/aquatic_http/Cargo.toml @@ -17,10 +17,12 @@ name = "aquatic_http" cpu-pinning = ["aquatic_common/cpu-pinning"] [dependencies] -anyhow = "1" aquatic_cli_helpers = "0.1.0" aquatic_common = "0.1.0" aquatic_http_protocol = "0.1.0" +aquatic_toml_config = "0.1.0" + +anyhow = "1" cfg-if = "1" either = "1" futures-lite = "1" @@ -38,7 +40,6 @@ serde = { version = "1", features = ["derive"] } signal-hook = { version = "0.3" } slab = "0.4" smartstring = "0.2" -aquatic_toml_config = "0.1.0" [dev-dependencies] quickcheck = "1" diff --git a/aquatic_http_load_test/Cargo.toml b/aquatic_http_load_test/Cargo.toml index c6ae2d8..34352a6 100644 --- a/aquatic_http_load_test/Cargo.toml +++ b/aquatic_http_load_test/Cargo.toml @@ -13,10 +13,12 @@ name = "aquatic_http_load_test" cpu-pinning = ["aquatic_common/cpu-pinning"] [dependencies] -anyhow = "1" aquatic_cli_helpers = "0.1.0" aquatic_common = "0.1.0" aquatic_http_protocol = "0.1.0" +aquatic_toml_config = "0.1.0" + +anyhow = "1" futures-lite = "1" hashbrown = "0.12" glommio = "0.7" @@ -26,7 +28,6 @@ rand = { version = "0.8", features = ["small_rng"] } rand_distr = "0.4" rustls = { version = "0.20", features = ["dangerous_configuration"] } serde = { version = "1", features = ["derive"] } -aquatic_toml_config = "0.1.0" [dev-dependencies] quickcheck = "1" diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index 53e42d5..7acb60d 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -17,12 +17,14 @@ name = "aquatic_udp" cpu-pinning = ["aquatic_common/cpu-pinning"] [dependencies] -anyhow = "1" aquatic_cli_helpers = "0.1.0" aquatic_common = "0.1.0" +aquatic_toml_config = "0.1.0" aquatic_udp_protocol = "0.1.0" -chrono = "0.4" + +anyhow = "1" cfg-if = "1" +chrono = "0.4" crossbeam-channel = "0.5" hex = "0.4" log = "0.4" @@ -31,11 +33,10 @@ mio = { version = "0.8", features = ["net", "os-poll"] } num-format = "0.4" rand = { version = "0.8", features = ["small_rng"] } serde = { version = "1", features = ["derive"] } -slab = "0.4" signal-hook = { version = "0.3" } +slab = "0.4" socket2 = { version = "0.4", features = ["all"] } tinytemplate = "1" -aquatic_toml_config = "0.1.0" [dev-dependencies] quickcheck = "1" diff --git a/aquatic_udp_bench/Cargo.toml b/aquatic_udp_bench/Cargo.toml index b5ee6ca..b7c4bf6 100644 --- a/aquatic_udp_bench/Cargo.toml +++ b/aquatic_udp_bench/Cargo.toml @@ -10,16 +10,17 @@ repository = "https://github.com/greatest-ape/aquatic" name = "aquatic_udp_bench" [dependencies] -anyhow = "1" aquatic_cli_helpers = "0.1.0" aquatic_common = "0.1.0" +aquatic_toml_config = "0.1.0" aquatic_udp = "0.1.0" aquatic_udp_protocol = "0.1.0" + +anyhow = "1" crossbeam-channel = "0.5" indicatif = "0.16" mimalloc = { version = "0.1", default-features = false } num-format = "0.4" -rand = { version = "0.8", features = ["small_rng"] } rand_distr = "0.4" +rand = { version = "0.8", features = ["small_rng"] } serde = { version = "1", features = ["derive"] } -aquatic_toml_config = "0.1.0" diff --git a/aquatic_udp_load_test/Cargo.toml b/aquatic_udp_load_test/Cargo.toml index 8c271e5..5656069 100644 --- a/aquatic_udp_load_test/Cargo.toml +++ b/aquatic_udp_load_test/Cargo.toml @@ -13,18 +13,19 @@ name = "aquatic_udp_load_test" cpu-pinning = ["aquatic_common/cpu-pinning"] [dependencies] -anyhow = "1" aquatic_cli_helpers = "0.1.0" aquatic_common = "0.1.0" +aquatic_toml_config = "0.1.0" aquatic_udp_protocol = "0.1.0" + +anyhow = "1" hashbrown = "0.12" mimalloc = { version = "0.1", default-features = false } mio = { version = "0.8", features = ["net", "os-poll"] } -rand = { version = "0.8", features = ["small_rng"] } rand_distr = "0.4" +rand = { version = "0.8", features = ["small_rng"] } serde = { version = "1", features = ["derive"] } socket2 = { version = "0.4", features = ["all"] } -aquatic_toml_config = "0.1.0" [dev-dependencies] quickcheck = "1" diff --git a/aquatic_ws/Cargo.toml b/aquatic_ws/Cargo.toml index 3b8654a..255cfa0 100644 --- a/aquatic_ws/Cargo.toml +++ b/aquatic_ws/Cargo.toml @@ -20,10 +20,12 @@ with-glommio = ["cpu-pinning", "async-tungstenite", "futures-lite", "futures", " with-mio = ["crossbeam-channel", "histogram", "mio", "parking_lot", "socket2"] [dependencies] -anyhow = "1" aquatic_cli_helpers = "0.1.0" aquatic_common = "0.1.0" +aquatic_toml_config = "0.1.0" aquatic_ws_protocol = "0.1.0" + +anyhow = "1" cfg-if = "1" either = "1" hashbrown = { version = "0.12", features = ["serde"] } @@ -36,7 +38,6 @@ rustls-pemfile = "0.3" serde = { version = "1", features = ["derive"] } signal-hook = { version = "0.3" } slab = "0.4" -aquatic_toml_config = "0.1.0" tungstenite = "0.17" # mio diff --git a/aquatic_ws_load_test/Cargo.toml b/aquatic_ws_load_test/Cargo.toml index 6917e51..b1c92e1 100644 --- a/aquatic_ws_load_test/Cargo.toml +++ b/aquatic_ws_load_test/Cargo.toml @@ -13,11 +13,13 @@ name = "aquatic_ws_load_test" cpu-pinning = ["aquatic_common/cpu-pinning"] [dependencies] -anyhow = "1" async-tungstenite = "0.17" aquatic_cli_helpers = "0.1.0" aquatic_common = "0.1.0" +aquatic_toml_config = "0.1.0" aquatic_ws_protocol = "0.1.0" + +anyhow = "1" futures = "0.3" futures-rustls = "0.22" glommio = "0.7" @@ -28,7 +30,6 @@ rand_distr = "0.4" rustls = { version = "0.20", features = ["dangerous_configuration"] } serde = { version = "1", features = ["derive"] } serde_json = "1" -aquatic_toml_config = "0.1.0" tungstenite = "0.17" [dev-dependencies] From e938351a9e67cacee2c77a110b9f20b25b70796d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 20 Feb 2022 01:47:03 +0100 Subject: [PATCH 6/8] udp: set default max_scrape_torrents=70, max_response_peers=50 --- aquatic_udp/src/config.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aquatic_udp/src/config.rs b/aquatic_udp/src/config.rs index e90e2e7..6e23599 100644 --- a/aquatic_udp/src/config.rs +++ b/aquatic_udp/src/config.rs @@ -123,8 +123,8 @@ pub struct ProtocolConfig { impl Default for ProtocolConfig { fn default() -> Self { Self { - max_scrape_torrents: 255, - max_response_peers: 255, + max_scrape_torrents: 70, + max_response_peers: 50, peer_announce_interval: 60 * 15, } } From e9ced0854728c38b0c6d5713e1949e543cc41c3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 20 Feb 2022 01:58:00 +0100 Subject: [PATCH 7/8] README: fix typos --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 346fa84..5ef5e7b 100644 --- a/README.md +++ b/README.md @@ -119,7 +119,7 @@ Implements: * [BEP 015]: UDP BitTorrent tracker protocol ([more details](https://libtorrent.org/udp_tracker_protocol.html)). Exceptions: * Doesn't care about IP addresses sent in announce requests. The packet source IP is always used. - * Doesn't track of the number of torrent downloads (0 is always sent). + * Doesn't track the number of torrent downloads (0 is always sent). IPv4 and IPv6 peers are tracked separately. @@ -148,7 +148,7 @@ More details are available [here](./documents/aquatic-udp-load-test-2021-11-28.p Implements: * [BEP 003]: HTTP BitTorrent protocol ([more details](https://wiki.theory.org/index.php/BitTorrentSpecification#Tracker_HTTP.2FHTTPS_Protocol)). Exceptions: * Only runs over TLS - * Doesn't track of the number of torrent downloads (0 is always sent) + * Doesn't track the number of torrent downloads (0 is always sent) * Only compact responses are supported * [BEP 023]: Compact HTTP responses * [BEP 007]: IPv6 support @@ -166,7 +166,7 @@ Aims for compatibility with [WebTorrent](https://github.com/webtorrent) clients. Notes: * Only runs over TLS - * Doesn't track of the number of torrent downloads (0 is always sent). + * Doesn't track the number of torrent downloads (0 is always sent). * Doesn't allow full scrapes, i.e. of all registered info hashes IPv4 and IPv6 peers are tracked separately. From 0f60ffbb75edf1f0f0eccfd6051f8a3a5d0c23ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 20 Feb 2022 12:07:28 +0100 Subject: [PATCH 8/8] udp load test: minor performance improvements --- .../src/worker/request_gen.rs | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/aquatic_udp_load_test/src/worker/request_gen.rs b/aquatic_udp_load_test/src/worker/request_gen.rs index 98091c8..ce99889 100644 --- a/aquatic_udp_load_test/src/worker/request_gen.rs +++ b/aquatic_udp_load_test/src/worker/request_gen.rs @@ -114,21 +114,21 @@ fn create_random_request( transaction_id: TransactionId, torrent_peer: &TorrentPeer, ) -> Request { - let weights = vec![ - config.requests.weight_announce as u32, - config.requests.weight_connect as u32, - config.requests.weight_scrape as u32, - ]; - - let items = vec![ + const ITEMS: [RequestType; 3] = [ RequestType::Announce, RequestType::Connect, RequestType::Scrape, ]; - let dist = WeightedIndex::new(&weights).expect("random request weighted index"); + let weights = [ + config.requests.weight_announce as u32, + config.requests.weight_connect as u32, + config.requests.weight_scrape as u32, + ]; - match items[dist.sample(rng)] { + let dist = WeightedIndex::new(weights).expect("random request weighted index"); + + match ITEMS[dist.sample(rng)] { RequestType::Announce => create_announce_request(config, rng, torrent_peer, transaction_id), RequestType::Connect => create_connect_request(transaction_id), RequestType::Scrape => create_scrape_request(&info_hashes, torrent_peer, transaction_id), @@ -209,7 +209,7 @@ fn create_torrent_peer( scrape_hash_indeces, connection_id, peer_id: generate_peer_id(), - port: Port(rand::random()), + port: Port(rng.gen()), } }