Merge pull request #52 from greatest-ape/work-2022-02-20

fix udp ipv6 issues; other improvements
This commit is contained in:
Joakim Frostegård 2022-02-20 12:52:22 +01:00 committed by GitHub
commit 065e007ede
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
28 changed files with 209 additions and 332 deletions

View file

@ -119,8 +119,7 @@ Implements:
* [BEP 015]: UDP BitTorrent tracker protocol ([more details](https://libtorrent.org/udp_tracker_protocol.html)). Exceptions:
* Doesn't care about IP addresses sent in announce requests. The packet
source IP is always used.
* Doesn't track of the number of torrent downloads (0 is always sent).
* [IPv6 support](https://web.archive.org/web/20170503181830/http://opentracker.blog.h3q.com/2007/12/28/the-ipv6-situation/)
* Doesn't track the number of torrent downloads (0 is always sent).
IPv4 and IPv6 peers are tracked separately.
@ -149,7 +148,7 @@ More details are available [here](./documents/aquatic-udp-load-test-2021-11-28.p
Implements:
* [BEP 003]: HTTP BitTorrent protocol ([more details](https://wiki.theory.org/index.php/BitTorrentSpecification#Tracker_HTTP.2FHTTPS_Protocol)). Exceptions:
* Only runs over TLS
* Doesn't track of the number of torrent downloads (0 is always sent)
* Doesn't track the number of torrent downloads (0 is always sent)
* Only compact responses are supported
* [BEP 023]: Compact HTTP responses
* [BEP 007]: IPv6 support
@ -167,7 +166,7 @@ Aims for compatibility with [WebTorrent](https://github.com/webtorrent)
clients. Notes:
* Only runs over TLS
* Doesn't track of the number of torrent downloads (0 is always sent).
* Doesn't track the number of torrent downloads (0 is always sent).
* Doesn't allow full scrapes, i.e. of all registered info hashes
IPv4 and IPv6 peers are tracked separately.

112
TODO.md
View file

@ -1,22 +1,39 @@
# TODO
* readme
* document privilege dropping and cpu pinning
## High priority
## Medium priority
* newer glommio versions might use SIGUSR1 internally, see glommio fe33e30
* quit whole program if any thread panics
* implement socket_recv_size and ipv6_only in glommio implementations
* config: fail on unrecognized keys?
* Run cargo-deny in CI
* aquatic_http:
* clean out connections regularly
* handle like in aquatic_ws
* Rc<RefCell<ValidUntil>> which get set on successful request parsing and
successful response sending. Clone kept in connection slab which gets cleaned
periodically (= cancel tasks). Means that task handle will need to be stored in slab.
Config vars kill_idle_connections: bool, max_idle_connection_time. Remove keepalive.
* handle panicked/cancelled tasks?
* aquatic_ws
* remove mio implementation when glommio issues fixed
* glommio
* RES memory still high after traffic stops, even if torrent maps and connection slabs go down to 0 len and capacity
* replacing indexmap_amortized / simd_json with equivalents doesn't help
* SinkExt::send maybe doesn't wake up properly?
* related to https://github.com/sdroege/async-tungstenite/blob/master/src/compat.rs#L18 ?
* extract_response_peers
* don't assume requesting peer is in list?
## Low priority
* config
* add flag to print parsed config when starting
* fail on unrecognized keys
* quit whole program if any thread panics
* implement socket_recv_size and ipv6_only in glommio implementations
* newer glommio versions might use SIGUSR1 internally, see glommio fe33e30
* CI
* file transfer CI for all implementations
* test access lists?
* cargo-deny
* aquatic_udp
* look at proper cpu pinning (check that one thread gets bound per core)
@ -27,19 +44,27 @@
* move additional request sending to for each received response, maybe
with probability 0.2
* aquatic_ws
* glommio
* proper cpu set pinning
* general
* large amount of temporary allocations in serialize_20_bytes, pretty many in deserialize_20_bytes
* extract response peers: extract "one extra" to compensate for removal,
of sender if present in selection?
# Not important
* aquatic_http:
* clean out connections regularly
* Rc<RefCell<ValidUntil>> which get set on successful request parsing and
successful response sending. Clone kept in connection slab which gets cleaned
periodically (= cancel tasks). Means that task handle will need to be stored in slab.
Config vars kill_idle_connections: bool, max_idle_connection_time. Remove keepalive.
* handle panicked/cancelled tasks?
* optimize?
* get_peer_addr only once (takes 1.2% of runtime)
* queue response: allocating takes 2.8% of runtime
* use futures-rustls for load test
* consider better error type for request parsing, so that better error
messages can be sent back (e.g., "full scrapes are not supported")
* test torrent transfer with real clients
* scrape: does it work (serialization etc), and with multiple hashes?
* 'left' optional in magnet requests? Probably not. Transmission sends huge
positive number.
* aquatic_ws
* mio
@ -51,49 +76,8 @@
* deregistering before closing is required by mio, but it hurts performance
* blocked on https://github.com/snapview/tungstenite-rs/issues/51
* connection closing: send tls close message etc?
* glommio
* proper cpu set pinning
* RES memory still high after traffic stops, even if torrent maps and connection slabs go down to 0 len and capacity
* replacing indexmap_amortized / simd_json with equivalents doesn't help
* SinkExt::send maybe doesn't wake up properly?
* related to https://github.com/sdroege/async-tungstenite/blob/master/src/compat.rs#L18 ?
* general
* large amount of temporary allocations in serialize_20_bytes, pretty many in deserialize_20_bytes
* extract_response_peers
* don't assume requesting peer is in list
# Less important
* extract response peers: extract "one extra" to compensate for removal,
of sender if present in selection? maybe make criterion benchmark,
optimize
## aquatic_http_load_test
* how handle large number of peers for "popular" torrents in keepalive mode?
maybe it is ok
## aquatic_http
* test torrent transfer with real clients
* scrape: does it work (serialization etc), and with multiple hashes?
* 'left' optional in magnet requests? Probably not. Transmission sends huge
positive number.
* compact=0 should result in error response
## aquatic_ws_load_test
* very small amount of connections means very small number of peers per
torrents, so tracker handling of large number is not really assessed
# Not important
## aquatic_ws
* write new version of extract_response_peers which checks for equality with
peer sending request? It could return an arrayvec or smallvec by the way
(but then the size needs to be adjusted together with the corresponding
config var, or the config var needs to be removed)
## aquatic_cli_helpers
* Include config field comments in exported toml (likely quite a bit of work)
* write new version of extract_response_peers which checks for equality with
peer sending request???
# Don't do

View file

@ -8,8 +8,9 @@ description = "aquatic BitTorrent tracker CLI helpers"
repository = "https://github.com/greatest-ape/aquatic"
[dependencies]
aquatic_toml_config = "0.1.0"
anyhow = "1"
serde = { version = "1", features = ["derive"] }
simplelog = "0.11"
toml = "0.5"
aquatic_toml_config = "0.1.0"

View file

@ -2,9 +2,9 @@ use std::fs::File;
use std::io::Read;
use anyhow::Context;
use aquatic_toml_config::TomlConfig;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use simplelog::{ColorChoice, ConfigBuilder, LevelFilter, TermLogger, TerminalMode};
use aquatic_toml_config::TomlConfig;
/// Log level. Available values are off, error, warn, info, debug and trace.
#[derive(Debug, Clone, Copy, PartialEq, TomlConfig, Serialize, Deserialize)]

View file

@ -14,6 +14,8 @@ name = "aquatic_common"
cpu-pinning = ["hwloc", "libc"]
[dependencies]
aquatic_toml_config = "0.1.0"
ahash = "0.7"
anyhow = "1"
arc-swap = "1"
@ -24,7 +26,6 @@ log = "0.4"
privdrop = "0.5"
rand = { version = "0.8", features = ["small_rng"] }
serde = { version = "1", features = ["derive"] }
aquatic_toml_config = "0.1.0"
# cpu-pinning
hwloc = { version = "0.5", optional = true }

View file

@ -4,10 +4,10 @@ use std::path::PathBuf;
use std::sync::Arc;
use anyhow::Context;
use aquatic_toml_config::TomlConfig;
use arc_swap::{ArcSwap, Cache};
use hashbrown::HashSet;
use serde::{Deserialize, Serialize};
use aquatic_toml_config::TomlConfig;
/// Access list mode. Available modes are white, black and off.
#[derive(Clone, Copy, Debug, PartialEq, TomlConfig, Serialize, Deserialize)]

View file

@ -1,6 +1,6 @@
use aquatic_toml_config::TomlConfig;
use hwloc::{CpuSet, ObjectType, Topology, CPUBIND_THREAD};
use serde::{Deserialize, Serialize};
use aquatic_toml_config::TomlConfig;
#[derive(Clone, Debug, PartialEq, TomlConfig, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]

View file

@ -52,16 +52,14 @@ where
if peer_map_len <= max_num_peers_to_take + 1 {
let mut peers = Vec::with_capacity(peer_map_len);
peers.extend(peer_map
.iter()
.filter_map(|(k, v)| {
if *k == sender_peer_map_key {
None
} else {
Some(peer_conversion_function(v))
}
}));
peers.extend(peer_map.iter().filter_map(|(k, v)| {
if *k == sender_peer_map_key {
None
} else {
Some(peer_conversion_function(v))
}
}));
peers
} else {
let half_num_to_take = max_num_peers_to_take / 2;

View file

@ -6,9 +6,9 @@ use std::{
time::Duration,
};
use aquatic_toml_config::TomlConfig;
use privdrop::PrivDrop;
use serde::Deserialize;
use aquatic_toml_config::TomlConfig;
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]

View file

@ -17,10 +17,12 @@ name = "aquatic_http"
cpu-pinning = ["aquatic_common/cpu-pinning"]
[dependencies]
anyhow = "1"
aquatic_cli_helpers = "0.1.0"
aquatic_common = "0.1.0"
aquatic_http_protocol = "0.1.0"
aquatic_toml_config = "0.1.0"
anyhow = "1"
cfg-if = "1"
either = "1"
futures-lite = "1"
@ -38,7 +40,6 @@ serde = { version = "1", features = ["derive"] }
signal-hook = { version = "0.3" }
slab = "0.4"
smartstring = "0.2"
aquatic_toml_config = "0.1.0"
[dev-dependencies]
quickcheck = "1"

View file

@ -1,8 +1,8 @@
use std::{net::SocketAddr, path::PathBuf};
use aquatic_common::{access_list::AccessListConfig, privileges::PrivilegeConfig};
use serde::Deserialize;
use aquatic_toml_config::TomlConfig;
use serde::Deserialize;
use aquatic_cli_helpers::LogLevel;

View file

@ -13,10 +13,12 @@ name = "aquatic_http_load_test"
cpu-pinning = ["aquatic_common/cpu-pinning"]
[dependencies]
anyhow = "1"
aquatic_cli_helpers = "0.1.0"
aquatic_common = "0.1.0"
aquatic_http_protocol = "0.1.0"
aquatic_toml_config = "0.1.0"
anyhow = "1"
futures-lite = "1"
hashbrown = "0.12"
glommio = "0.7"
@ -26,7 +28,6 @@ rand = { version = "0.8", features = ["small_rng"] }
rand_distr = "0.4"
rustls = { version = "0.20", features = ["dangerous_configuration"] }
serde = { version = "1", features = ["derive"] }
aquatic_toml_config = "0.1.0"
[dev-dependencies]
quickcheck = "1"

View file

@ -1,8 +1,8 @@
use std::net::SocketAddr;
use aquatic_cli_helpers::LogLevel;
use serde::Deserialize;
use aquatic_toml_config::TomlConfig;
use serde::Deserialize;
/// aquatic_http_load_test configuration
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]

View file

@ -1,5 +1,5 @@
pub use toml;
pub use aquatic_toml_config_derive::TomlConfig;
pub use toml;
/// Run this on your struct implementing TomlConfig to generate a
/// serialization/deserialization test for it.

View file

@ -17,12 +17,14 @@ name = "aquatic_udp"
cpu-pinning = ["aquatic_common/cpu-pinning"]
[dependencies]
anyhow = "1"
aquatic_cli_helpers = "0.1.0"
aquatic_common = "0.1.0"
aquatic_toml_config = "0.1.0"
aquatic_udp_protocol = "0.1.0"
chrono = "0.4"
anyhow = "1"
cfg-if = "1"
chrono = "0.4"
crossbeam-channel = "0.5"
hex = "0.4"
log = "0.4"
@ -31,11 +33,10 @@ mio = { version = "0.8", features = ["net", "os-poll"] }
num-format = "0.4"
rand = { version = "0.8", features = ["small_rng"] }
serde = { version = "1", features = ["derive"] }
slab = "0.4"
signal-hook = { version = "0.3" }
slab = "0.4"
socket2 = { version = "0.4", features = ["all"] }
tinytemplate = "1"
aquatic_toml_config = "0.1.0"
[dev-dependencies]
quickcheck = "1"

View file

@ -1,6 +1,6 @@
use std::collections::BTreeMap;
use std::hash::Hash;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::net::{Ipv4Addr, Ipv6Addr};
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
@ -14,22 +14,6 @@ use crate::config::Config;
pub const MAX_PACKET_SIZE: usize = 8192;
pub trait Ip: Hash + PartialEq + Eq + Clone + Copy {
fn ip_addr(self) -> IpAddr;
}
impl Ip for Ipv4Addr {
fn ip_addr(self) -> IpAddr {
IpAddr::V4(self)
}
}
impl Ip for Ipv6Addr {
fn ip_addr(self) -> IpAddr {
IpAddr::V6(self)
}
}
#[derive(Debug)]
pub struct PendingScrapeRequest {
pub slab_key: usize,
@ -50,8 +34,8 @@ pub enum ConnectedRequest {
#[derive(Debug)]
pub enum ConnectedResponse {
AnnounceIpv4(AnnounceResponseIpv4),
AnnounceIpv6(AnnounceResponseIpv6),
AnnounceIpv4(AnnounceResponse<Ipv4Addr>),
AnnounceIpv6(AnnounceResponse<Ipv6Addr>),
Scrape(PendingScrapeResponse),
}
@ -234,14 +218,14 @@ mod tests {
let config = Config::default();
let peers = ::std::iter::repeat(ResponsePeerIpv6 {
let peers = ::std::iter::repeat(ResponsePeer {
ip_address: Ipv6Addr::new(1, 1, 1, 1, 1, 1, 1, 1),
port: Port(1),
})
.take(config.protocol.max_response_peers)
.collect();
let response = Response::AnnounceIpv6(AnnounceResponseIpv6 {
let response = Response::AnnounceIpv6(AnnounceResponse {
transaction_id: TransactionId(1),
announce_interval: AnnounceInterval(1),
seeders: NumberOfPeers(1),

View file

@ -123,8 +123,8 @@ pub struct ProtocolConfig {
impl Default for ProtocolConfig {
fn default() -> Self {
Self {
max_scrape_torrents: 255,
max_response_peers: 255,
max_scrape_torrents: 70,
max_response_peers: 50,
peer_announce_interval: 60 * 15,
}
}
@ -188,7 +188,7 @@ impl Default for CleaningConfig {
connection_cleaning_interval: 60,
torrent_cleaning_interval: 60 * 2,
pending_scrape_cleaning_interval: 60 * 10,
max_connection_age: 60 * 5,
max_connection_age: 60 * 2,
max_peer_age: 60 * 20,
max_pending_scrape_age: 60,
}

View file

@ -30,6 +30,15 @@ struct Peer<I: Ip> {
pub valid_until: ValidUntil,
}
impl<I: Ip> Peer<I> {
fn to_response_peer(&self) -> ResponsePeer<I> {
ResponsePeer {
ip_address: self.ip_address,
port: self.port,
}
}
}
type PeerMap<I> = AHashIndexMap<PeerId, Peer<I>>;
struct TorrentData<I: Ip> {
@ -111,68 +120,6 @@ impl TorrentMaps {
}
}
#[derive(Clone, PartialEq, Debug)]
pub struct ProtocolResponsePeer<I> {
pub ip_address: I,
pub port: Port,
}
impl<I: Ip> ProtocolResponsePeer<I> {
#[inline(always)]
fn from_peer(peer: &Peer<I>) -> Self {
Self {
ip_address: peer.ip_address,
port: peer.port,
}
}
}
pub struct ProtocolAnnounceResponse<I> {
pub transaction_id: TransactionId,
pub announce_interval: AnnounceInterval,
pub leechers: NumberOfPeers,
pub seeders: NumberOfPeers,
pub peers: Vec<ProtocolResponsePeer<I>>,
}
impl Into<ConnectedResponse> for ProtocolAnnounceResponse<Ipv4Addr> {
fn into(self) -> ConnectedResponse {
ConnectedResponse::AnnounceIpv4(AnnounceResponseIpv4 {
transaction_id: self.transaction_id,
announce_interval: self.announce_interval,
leechers: self.leechers,
seeders: self.seeders,
peers: self
.peers
.into_iter()
.map(|peer| ResponsePeerIpv4 {
ip_address: peer.ip_address,
port: peer.port,
})
.collect(),
})
}
}
impl Into<ConnectedResponse> for ProtocolAnnounceResponse<Ipv6Addr> {
fn into(self) -> ConnectedResponse {
ConnectedResponse::AnnounceIpv6(AnnounceResponseIpv6 {
transaction_id: self.transaction_id,
announce_interval: self.announce_interval,
leechers: self.leechers,
seeders: self.seeders,
peers: self
.peers
.into_iter()
.map(|peer| ResponsePeerIpv6 {
ip_address: peer.ip_address,
port: peer.port,
})
.collect(),
})
}
}
pub fn run_request_worker(
config: Config,
state: State,
@ -258,24 +205,22 @@ fn handle_announce_request(
peer_valid_until: ValidUntil,
) -> ConnectedResponse {
match src.get().ip() {
IpAddr::V4(ip) => handle_announce_request_inner(
IpAddr::V4(ip) => ConnectedResponse::AnnounceIpv4(handle_announce_request_inner(
config,
rng,
&mut torrents.ipv4,
request,
ip,
peer_valid_until,
)
.into(),
IpAddr::V6(ip) => handle_announce_request_inner(
)),
IpAddr::V6(ip) => ConnectedResponse::AnnounceIpv6(handle_announce_request_inner(
config,
rng,
&mut torrents.ipv6,
request,
ip,
peer_valid_until,
)
.into(),
)),
}
}
@ -286,7 +231,7 @@ fn handle_announce_request_inner<I: Ip>(
request: AnnounceRequest,
peer_ip: I,
peer_valid_until: ValidUntil,
) -> ProtocolAnnounceResponse<I> {
) -> AnnounceResponse<I> {
let peer_status = PeerStatus::from_event_and_bytes_left(request.event, request.bytes_left);
let peer = Peer {
@ -329,10 +274,10 @@ fn handle_announce_request_inner<I: Ip>(
&torrent_data.peers,
max_num_peers_to_take,
request.peer_id,
ProtocolResponsePeer::from_peer,
Peer::to_response_peer,
);
ProtocolAnnounceResponse {
AnnounceResponse {
transaction_id: request.transaction_id,
announce_interval: AnnounceInterval(config.protocol.peer_announce_interval),
leechers: NumberOfPeers(torrent_data.num_leechers as i32),
@ -448,7 +393,7 @@ mod tests {
if i == 0 {
opt_sender_key = Some(key);
opt_sender_peer = Some(ProtocolResponsePeer::from_peer(&peer));
opt_sender_peer = Some(peer.to_response_peer());
}
peer_map.insert(key, peer);
@ -461,7 +406,7 @@ mod tests {
&peer_map,
req_num_peers,
opt_sender_key.unwrap_or_else(|| gen_peer_id(1)),
ProtocolResponsePeer::from_peer,
Peer::to_response_peer,
);
// Check that number of returned peers is correct

View file

@ -10,16 +10,17 @@ repository = "https://github.com/greatest-ape/aquatic"
name = "aquatic_udp_bench"
[dependencies]
anyhow = "1"
aquatic_cli_helpers = "0.1.0"
aquatic_common = "0.1.0"
aquatic_toml_config = "0.1.0"
aquatic_udp = "0.1.0"
aquatic_udp_protocol = "0.1.0"
anyhow = "1"
crossbeam-channel = "0.5"
indicatif = "0.16"
mimalloc = { version = "0.1", default-features = false }
num-format = "0.4"
rand = { version = "0.8", features = ["small_rng"] }
rand_distr = "0.4"
rand = { version = "0.8", features = ["small_rng"] }
serde = { version = "1", features = ["derive"] }
aquatic_toml_config = "0.1.0"

View file

@ -1,5 +1,5 @@
use serde::Deserialize;
use aquatic_toml_config::TomlConfig;
use serde::Deserialize;
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
pub struct BenchConfig {

View file

@ -13,18 +13,19 @@ name = "aquatic_udp_load_test"
cpu-pinning = ["aquatic_common/cpu-pinning"]
[dependencies]
anyhow = "1"
aquatic_cli_helpers = "0.1.0"
aquatic_common = "0.1.0"
aquatic_toml_config = "0.1.0"
aquatic_udp_protocol = "0.1.0"
anyhow = "1"
hashbrown = "0.12"
mimalloc = { version = "0.1", default-features = false }
mio = { version = "0.8", features = ["net", "os-poll"] }
rand = { version = "0.8", features = ["small_rng"] }
rand_distr = "0.4"
rand = { version = "0.8", features = ["small_rng"] }
serde = { version = "1", features = ["derive"] }
socket2 = { version = "0.4", features = ["all"] }
aquatic_toml_config = "0.1.0"
[dev-dependencies]
quickcheck = "1"

View file

@ -56,7 +56,7 @@ pub fn run_worker_thread(
for event in events.iter() {
if (event.token() == token) & event.is_readable() {
while let Ok(amt) = socket.recv(&mut buffer) {
match Response::from_bytes(&buffer[0..amt]) {
match Response::from_bytes(&buffer[0..amt], addr.is_ipv4()) {
Ok(response) => {
match response {
Response::AnnounceIpv4(ref r) => {

View file

@ -114,21 +114,21 @@ fn create_random_request(
transaction_id: TransactionId,
torrent_peer: &TorrentPeer,
) -> Request {
let weights = vec![
config.requests.weight_announce as u32,
config.requests.weight_connect as u32,
config.requests.weight_scrape as u32,
];
let items = vec![
const ITEMS: [RequestType; 3] = [
RequestType::Announce,
RequestType::Connect,
RequestType::Scrape,
];
let dist = WeightedIndex::new(&weights).expect("random request weighted index");
let weights = [
config.requests.weight_announce as u32,
config.requests.weight_connect as u32,
config.requests.weight_scrape as u32,
];
match items[dist.sample(rng)] {
let dist = WeightedIndex::new(weights).expect("random request weighted index");
match ITEMS[dist.sample(rng)] {
RequestType::Announce => create_announce_request(config, rng, torrent_peer, transaction_id),
RequestType::Connect => create_connect_request(transaction_id),
RequestType::Scrape => create_scrape_request(&info_hashes, torrent_peer, transaction_id),
@ -209,7 +209,7 @@ fn create_torrent_peer(
scrape_hash_indeces,
connection_id,
peer_id: generate_peer_id(),
port: Port(rand::random()),
port: Port(rng.gen()),
}
}

View file

@ -1,5 +1,11 @@
use std::fmt::Debug;
use std::net::{Ipv4Addr, Ipv6Addr};
pub trait Ip: Clone + Copy + Debug + PartialEq + Eq {}
impl Ip for Ipv4Addr {}
impl Ip for Ipv6Addr {}
#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)]
pub struct AnnounceInterval(pub i32);
@ -30,15 +36,9 @@ pub struct PeerId(pub [u8; 20]);
#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)]
pub struct PeerKey(pub u32);
#[derive(Hash, PartialEq, Eq, Clone, Debug)]
pub struct ResponsePeerIpv4 {
pub ip_address: Ipv4Addr,
pub port: Port,
}
#[derive(Hash, PartialEq, Eq, Clone, Debug)]
pub struct ResponsePeerIpv6 {
pub ip_address: Ipv6Addr,
#[derive(PartialEq, Eq, Clone, Debug)]
pub struct ResponsePeer<I: Ip> {
pub ip_address: I,
pub port: Port,
}
@ -69,17 +69,7 @@ impl quickcheck::Arbitrary for PeerId {
}
#[cfg(test)]
impl quickcheck::Arbitrary for ResponsePeerIpv4 {
fn arbitrary(g: &mut quickcheck::Gen) -> Self {
Self {
ip_address: quickcheck::Arbitrary::arbitrary(g),
port: Port(u16::arbitrary(g).into()),
}
}
}
#[cfg(test)]
impl quickcheck::Arbitrary for ResponsePeerIpv6 {
impl<I: Ip + quickcheck::Arbitrary> quickcheck::Arbitrary for ResponsePeer<I> {
fn arbitrary(g: &mut quickcheck::Gen) -> Self {
Self {
ip_address: quickcheck::Arbitrary::arbitrary(g),

View file

@ -21,21 +21,12 @@ pub struct ConnectResponse {
}
#[derive(PartialEq, Eq, Clone, Debug)]
pub struct AnnounceResponseIpv4 {
pub struct AnnounceResponse<I: Ip> {
pub transaction_id: TransactionId,
pub announce_interval: AnnounceInterval,
pub leechers: NumberOfPeers,
pub seeders: NumberOfPeers,
pub peers: Vec<ResponsePeerIpv4>,
}
#[derive(PartialEq, Eq, Clone, Debug)]
pub struct AnnounceResponseIpv6 {
pub transaction_id: TransactionId,
pub announce_interval: AnnounceInterval,
pub leechers: NumberOfPeers,
pub seeders: NumberOfPeers,
pub peers: Vec<ResponsePeerIpv6>,
pub peers: Vec<ResponsePeer<I>>,
}
#[derive(PartialEq, Eq, Clone, Debug)]
@ -53,8 +44,8 @@ pub struct ErrorResponse {
#[derive(PartialEq, Eq, Clone, Debug)]
pub enum Response {
Connect(ConnectResponse),
AnnounceIpv4(AnnounceResponseIpv4),
AnnounceIpv6(AnnounceResponseIpv6),
AnnounceIpv4(AnnounceResponse<Ipv4Addr>),
AnnounceIpv6(AnnounceResponse<Ipv6Addr>),
Scrape(ScrapeResponse),
Error(ErrorResponse),
}
@ -65,14 +56,14 @@ impl From<ConnectResponse> for Response {
}
}
impl From<AnnounceResponseIpv4> for Response {
fn from(r: AnnounceResponseIpv4) -> Self {
impl From<AnnounceResponse<Ipv4Addr>> for Response {
fn from(r: AnnounceResponse<Ipv4Addr>) -> Self {
Self::AnnounceIpv4(r)
}
}
impl From<AnnounceResponseIpv6> for Response {
fn from(r: AnnounceResponseIpv6) -> Self {
impl From<AnnounceResponse<Ipv6Addr>> for Response {
fn from(r: AnnounceResponse<Ipv6Addr>) -> Self {
Self::AnnounceIpv6(r)
}
}
@ -90,12 +81,6 @@ impl From<ErrorResponse> for Response {
}
impl Response {
/// Returning IPv6 peers doesn't really work with UDP. It is not supported
/// by https://libtorrent.org/udp_tracker_protocol.html. There is a
/// suggestion in https://web.archive.org/web/20170503181830/http://opentracker.blog.h3q.com/2007/12/28/the-ipv6-situation/
/// of using action number 4 and returning IPv6 octets just like for IPv4
/// addresses. Clients seem not to support it very well, but due to a lack
/// of alternative solutions, it is implemented here.
#[inline]
pub fn write(&self, bytes: &mut impl Write) -> Result<(), io::Error> {
match self {
@ -116,6 +101,18 @@ impl Response {
bytes.write_u16::<NetworkEndian>(peer.port.0)?;
}
}
Response::AnnounceIpv6(r) => {
bytes.write_i32::<NetworkEndian>(1)?;
bytes.write_i32::<NetworkEndian>(r.transaction_id.0)?;
bytes.write_i32::<NetworkEndian>(r.announce_interval.0)?;
bytes.write_i32::<NetworkEndian>(r.leechers.0)?;
bytes.write_i32::<NetworkEndian>(r.seeders.0)?;
for peer in r.peers.iter() {
bytes.write_all(&peer.ip_address.octets())?;
bytes.write_u16::<NetworkEndian>(peer.port.0)?;
}
}
Response::Scrape(r) => {
bytes.write_i32::<NetworkEndian>(2)?;
bytes.write_i32::<NetworkEndian>(r.transaction_id.0)?;
@ -132,25 +129,13 @@ impl Response {
bytes.write_all(r.message.as_bytes())?;
}
Response::AnnounceIpv6(r) => {
bytes.write_i32::<NetworkEndian>(4)?;
bytes.write_i32::<NetworkEndian>(r.transaction_id.0)?;
bytes.write_i32::<NetworkEndian>(r.announce_interval.0)?;
bytes.write_i32::<NetworkEndian>(r.leechers.0)?;
bytes.write_i32::<NetworkEndian>(r.seeders.0)?;
for peer in r.peers.iter() {
bytes.write_all(&peer.ip_address.octets())?;
bytes.write_u16::<NetworkEndian>(peer.port.0)?;
}
}
}
Ok(())
}
#[inline]
pub fn from_bytes(bytes: &[u8]) -> Result<Self, io::Error> {
pub fn from_bytes(bytes: &[u8], ipv4: bool) -> Result<Self, io::Error> {
let mut cursor = Cursor::new(bytes);
let action = cursor.read_i32::<NetworkEndian>()?;
@ -168,7 +153,7 @@ impl Response {
.into())
}
// Announce
1 => {
1 if ipv4 => {
let announce_interval = cursor.read_i32::<NetworkEndian>()?;
let leechers = cursor.read_i32::<NetworkEndian>()?;
let seeders = cursor.read_i32::<NetworkEndian>()?;
@ -183,14 +168,45 @@ impl Response {
let ip_address = Ipv4Addr::from(ip_bytes);
let port = (&chunk[4..]).read_u16::<NetworkEndian>().unwrap();
ResponsePeerIpv4 {
ResponsePeer {
ip_address,
port: Port(port),
}
})
.collect();
Ok((AnnounceResponseIpv4 {
Ok((AnnounceResponse {
transaction_id: TransactionId(transaction_id),
announce_interval: AnnounceInterval(announce_interval),
leechers: NumberOfPeers(leechers),
seeders: NumberOfPeers(seeders),
peers,
})
.into())
}
1 if !ipv4 => {
let announce_interval = cursor.read_i32::<NetworkEndian>()?;
let leechers = cursor.read_i32::<NetworkEndian>()?;
let seeders = cursor.read_i32::<NetworkEndian>()?;
let position = cursor.position() as usize;
let inner = cursor.into_inner();
let peers = inner[position..]
.chunks_exact(18)
.map(|chunk| {
let ip_bytes: [u8; 16] = (&chunk[..16]).try_into().unwrap();
let ip_address = Ipv6Addr::from(ip_bytes);
let port = (&chunk[16..]).read_u16::<NetworkEndian>().unwrap();
ResponsePeer {
ip_address,
port: Port(port),
}
})
.collect();
Ok((AnnounceResponse {
transaction_id: TransactionId(transaction_id),
announce_interval: AnnounceInterval(announce_interval),
leechers: NumberOfPeers(leechers),
@ -240,38 +256,6 @@ impl Response {
})
.into())
}
// IPv6 announce
4 => {
let announce_interval = cursor.read_i32::<NetworkEndian>()?;
let leechers = cursor.read_i32::<NetworkEndian>()?;
let seeders = cursor.read_i32::<NetworkEndian>()?;
let position = cursor.position() as usize;
let inner = cursor.into_inner();
let peers = inner[position..]
.chunks_exact(18)
.map(|chunk| {
let ip_bytes: [u8; 16] = (&chunk[..16]).try_into().unwrap();
let ip_address = Ipv6Addr::from(ip_bytes);
let port = (&chunk[16..]).read_u16::<NetworkEndian>().unwrap();
ResponsePeerIpv6 {
ip_address,
port: Port(port),
}
})
.collect();
Ok((AnnounceResponseIpv6 {
transaction_id: TransactionId(transaction_id),
announce_interval: AnnounceInterval(announce_interval),
leechers: NumberOfPeers(leechers),
seeders: NumberOfPeers(seeders),
peers,
})
.into())
}
_ => Ok((ErrorResponse {
transaction_id: TransactionId(transaction_id),
message: "Invalid action".into(),
@ -306,26 +290,10 @@ mod tests {
}
}
impl quickcheck::Arbitrary for AnnounceResponseIpv4 {
impl<I: Ip + quickcheck::Arbitrary> quickcheck::Arbitrary for AnnounceResponse<I> {
fn arbitrary(g: &mut quickcheck::Gen) -> Self {
let peers = (0..u8::arbitrary(g))
.map(|_| ResponsePeerIpv4::arbitrary(g))
.collect();
Self {
transaction_id: TransactionId(i32::arbitrary(g)),
announce_interval: AnnounceInterval(i32::arbitrary(g)),
leechers: NumberOfPeers(i32::arbitrary(g)),
seeders: NumberOfPeers(i32::arbitrary(g)),
peers,
}
}
}
impl quickcheck::Arbitrary for AnnounceResponseIpv6 {
fn arbitrary(g: &mut quickcheck::Gen) -> Self {
let peers = (0..u8::arbitrary(g))
.map(|_| ResponsePeerIpv6::arbitrary(g))
.map(|_| ResponsePeer::arbitrary(g))
.collect();
Self {
@ -351,11 +319,11 @@ mod tests {
}
}
fn same_after_conversion(response: Response) -> bool {
fn same_after_conversion(response: Response, ipv4: bool) -> bool {
let mut buf = Vec::new();
response.clone().write(&mut buf).unwrap();
let r2 = Response::from_bytes(&buf[..]).unwrap();
let r2 = Response::from_bytes(&buf[..], ipv4).unwrap();
let success = response == r2;
@ -368,21 +336,21 @@ mod tests {
#[quickcheck]
fn test_connect_response_convert_identity(response: ConnectResponse) -> bool {
same_after_conversion(response.into())
same_after_conversion(response.into(), true)
}
#[quickcheck]
fn test_announce_response_ipv4_convert_identity(response: AnnounceResponseIpv4) -> bool {
same_after_conversion(response.into())
fn test_announce_response_ipv4_convert_identity(response: AnnounceResponse<Ipv4Addr>) -> bool {
same_after_conversion(response.into(), true)
}
#[quickcheck]
fn test_announce_response_ipv6_convert_identity(response: AnnounceResponseIpv6) -> bool {
same_after_conversion(response.into())
fn test_announce_response_ipv6_convert_identity(response: AnnounceResponse<Ipv6Addr>) -> bool {
same_after_conversion(response.into(), false)
}
#[quickcheck]
fn test_scrape_response_convert_identity(response: ScrapeResponse) -> bool {
same_after_conversion(response.into())
same_after_conversion(response.into(), true)
}
}

View file

@ -20,10 +20,12 @@ with-glommio = ["cpu-pinning", "async-tungstenite", "futures-lite", "futures", "
with-mio = ["crossbeam-channel", "histogram", "mio", "parking_lot", "socket2"]
[dependencies]
anyhow = "1"
aquatic_cli_helpers = "0.1.0"
aquatic_common = "0.1.0"
aquatic_toml_config = "0.1.0"
aquatic_ws_protocol = "0.1.0"
anyhow = "1"
cfg-if = "1"
either = "1"
hashbrown = { version = "0.12", features = ["serde"] }
@ -36,7 +38,6 @@ rustls-pemfile = "0.3"
serde = { version = "1", features = ["derive"] }
signal-hook = { version = "0.3" }
slab = "0.4"
aquatic_toml_config = "0.1.0"
tungstenite = "0.17"
# mio

View file

@ -13,11 +13,13 @@ name = "aquatic_ws_load_test"
cpu-pinning = ["aquatic_common/cpu-pinning"]
[dependencies]
anyhow = "1"
async-tungstenite = "0.17"
aquatic_cli_helpers = "0.1.0"
aquatic_common = "0.1.0"
aquatic_toml_config = "0.1.0"
aquatic_ws_protocol = "0.1.0"
anyhow = "1"
futures = "0.3"
futures-rustls = "0.22"
glommio = "0.7"
@ -28,7 +30,6 @@ rand_distr = "0.4"
rustls = { version = "0.20", features = ["dangerous_configuration"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
aquatic_toml_config = "0.1.0"
tungstenite = "0.17"
[dev-dependencies]

View file

@ -3,8 +3,8 @@ use std::net::SocketAddr;
use aquatic_cli_helpers::LogLevel;
#[cfg(feature = "cpu-pinning")]
use aquatic_common::cpu_pinning::CpuPinningConfig;
use serde::Deserialize;
use aquatic_toml_config::TomlConfig;
use serde::Deserialize;
/// aquatic_ws_load_test configuration
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]