diff --git a/TODO.md b/TODO.md
index 4389fae..ed05daa 100644
--- a/TODO.md
+++ b/TODO.md
@@ -2,7 +2,9 @@
## General
-* use ipv4-mapped address functions
+* use ipv4-mapped address functions, but I should check that they really
+ work as they really work as they should. All announces over ipv4 should
+ go to ipv4 map, all over ipv6 to ipv6 map
* avx-512 should be avoided, maybe this should be mentioned in README
and maybe run scripts should be adjusted
@@ -11,8 +13,6 @@
* test tls
* current serialized byte strings valid
* scrape: does it work with multiple hashes?
-* store Ipv4Addr / Ipv6 addr in peer map, for correctness and so that strange
- conversion in handler doesn't have to occur
* compact=0 should result in error response
* tests of request parsing
* tests of response serialization (against data known to be good would be nice)
diff --git a/aquatic_http/src/lib/common.rs b/aquatic_http/src/lib/common.rs
index 3375480..c0f2549 100644
--- a/aquatic_http/src/lib/common.rs
+++ b/aquatic_http/src/lib/common.rs
@@ -1,4 +1,4 @@
-use std::net::{IpAddr, SocketAddr};
+use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
use std::sync::Arc;
use either::Either;
@@ -13,7 +13,7 @@ pub use aquatic_common::{ValidUntil, convert_ipv4_mapped_ipv4};
use crate::protocol::common::*;
use crate::protocol::request::Request;
-use crate::protocol::response::Response;
+use crate::protocol::response::{Response, ResponsePeer};
#[derive(Clone, Copy, Debug)]
@@ -26,19 +26,11 @@ pub struct ConnectionMeta {
}
-impl ConnectionMeta {
- pub fn map_ipv4_ip(&self) -> Self {
- let peer_addr = SocketAddr::new(
- convert_ipv4_mapped_ipv4(self.peer_addr.ip()),
- self.peer_addr.port()
- );
-
- Self {
- worker_index: self.worker_index,
- peer_addr,
- poll_token: self.poll_token
- }
- }
+#[derive(Clone, Copy, Debug)]
+pub struct PeerConnectionMeta
{
+ pub worker_index: usize,
+ pub poll_token: Token,
+ pub peer_ip_address: P,
}
@@ -71,32 +63,42 @@ impl PeerStatus {
#[derive(Clone, Copy)]
-pub struct Peer {
- pub connection_meta: ConnectionMeta,
+pub struct Peer
{
+ pub connection_meta: PeerConnectionMeta
,
pub port: u16,
pub status: PeerStatus,
pub valid_until: ValidUntil,
}
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
-pub struct PeerMapKey {
- pub peer_id: PeerId,
- pub ip_or_key: Either
+impl Peer {
+ pub fn to_response_peer(&self) -> ResponsePeer {
+ ResponsePeer {
+ ip_address: self.connection_meta.peer_ip_address,
+ port: self.port
+ }
+ }
}
-pub type PeerMap = IndexMap;
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct PeerMapKey {
+ pub peer_id: PeerId,
+ pub ip_or_key: Either
+}
-pub struct TorrentData {
- pub peers: PeerMap,
+pub type PeerMap
= IndexMap, Peer>;
+
+
+pub struct TorrentData {
+ pub peers: PeerMap,
pub num_seeders: usize,
pub num_leechers: usize,
}
-impl Default for TorrentData {
+impl Default for TorrentData {
#[inline]
fn default() -> Self {
Self {
@@ -108,13 +110,13 @@ impl Default for TorrentData {
}
-pub type TorrentMap = HashMap;
+pub type TorrentMap = HashMap>;
#[derive(Default)]
pub struct TorrentMaps {
- pub ipv4: TorrentMap,
- pub ipv6: TorrentMap,
+ pub ipv4: TorrentMap,
+ pub ipv6: TorrentMap,
}
diff --git a/aquatic_http/src/lib/handler.rs b/aquatic_http/src/lib/handler.rs
index 3e8d2da..caff517 100644
--- a/aquatic_http/src/lib/handler.rs
+++ b/aquatic_http/src/lib/handler.rs
@@ -1,6 +1,6 @@
use std::time::Duration;
use std::vec::Drain;
-use std::net::IpAddr;
+use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use either::Either;
use hashbrown::HashMap;
@@ -97,122 +97,155 @@ pub fn handle_announce_requests(
let valid_until = ValidUntil::new(config.cleaning.max_peer_age);
responses.extend(requests.map(|(request_sender_meta, request)| {
- let converted_request_sender_meta = request_sender_meta.map_ipv4_ip();
-
- let torrent_data: &mut TorrentData = if converted_request_sender_meta.peer_addr.is_ipv4(){
- torrent_maps.ipv4.entry(request.info_hash).or_default()
- } else {
- torrent_maps.ipv6.entry(request.info_hash).or_default()
- };
-
- // Insert/update/remove peer who sent this request
- {
- let request_sender_meta = converted_request_sender_meta;
-
- let peer_status = PeerStatus::from_event_and_bytes_left(
- request.event,
- Some(request.bytes_left)
- );
-
- let peer = Peer {
- connection_meta: request_sender_meta,
- port: request.port,
- status: peer_status,
- valid_until,
- };
-
- let ip_or_key = request.key
- .map(Either::Right)
- .unwrap_or_else(||
- Either::Left(request_sender_meta.peer_addr.ip())
- );
-
- let peer_map_key = PeerMapKey {
- peer_id: request.peer_id,
- ip_or_key,
- };
-
- let opt_removed_peer = match peer_status {
- PeerStatus::Leeching => {
- torrent_data.num_leechers += 1;
-
- torrent_data.peers.insert(peer_map_key, peer)
- },
- PeerStatus::Seeding => {
- torrent_data.num_seeders += 1;
-
- torrent_data.peers.insert(peer_map_key, peer)
- },
- PeerStatus::Stopped => {
- torrent_data.peers.remove(&peer_map_key)
- }
- };
-
- match opt_removed_peer.map(|peer| peer.status){
- Some(PeerStatus::Leeching) => {
- torrent_data.num_leechers -= 1;
- },
- Some(PeerStatus::Seeding) => {
- torrent_data.num_seeders -= 1;
- },
- _ => {}
- }
- }
-
- let max_num_peers_to_take = match request.numwant {
- Some(0) | None => config.protocol.max_peers,
- Some(numwant) => numwant.min(config.protocol.max_peers),
- };
-
- // FIXME: proper protocol peer should be extracted here, not below.
- // Ideally, protocol-specific IP should be stored in connection meta
- // in peer map.
- let response_peers: Vec = extract_response_peers(
- rng,
- &torrent_data.peers,
- max_num_peers_to_take,
- ResponsePeer::from_peer
+ let peer_ip = convert_ipv4_mapped_ipv4(
+ request_sender_meta.peer_addr.ip()
);
- let response_peers_v4 = response_peers.iter()
- .filter_map(|peer| {
- if let IpAddr::V4(ip_address) = peer.ip_address {
- Some(ResponsePeerV4 {
- ip_address,
- port: peer.port
- })
- } else {
- None
- }
- })
- .collect();
+ let response = match peer_ip {
+ IpAddr::V4(peer_ip_address) => {
+ let torrent_data: &mut TorrentData = torrent_maps.ipv4
+ .entry(request.info_hash)
+ .or_default();
+
+ let peer_connection_meta = PeerConnectionMeta {
+ worker_index: request_sender_meta.worker_index,
+ poll_token: request_sender_meta.poll_token,
+ peer_ip_address,
+ };
- let response_peers_v6 = response_peers.iter()
- .filter_map(|peer| {
- if let IpAddr::V6(ip_address) = peer.ip_address {
- Some(ResponsePeerV6 {
- ip_address,
- port: peer.port
- })
- } else {
- None
- }
- })
- .collect();
+ let (seeders, leechers, response_peers) = upsert_peer_and_get_response_peers(
+ config,
+ rng,
+ peer_connection_meta,
+ torrent_data,
+ request,
+ valid_until
+ );
- let response = Response::Announce(AnnounceResponse {
- complete: torrent_data.num_seeders,
- incomplete: torrent_data.num_leechers,
- announce_interval: config.protocol.peer_announce_interval,
- peers: ResponsePeerListV4(response_peers_v4),
- peers6: ResponsePeerListV6(response_peers_v6),
- });
+ let response = AnnounceResponse {
+ complete: seeders,
+ incomplete: leechers,
+ announce_interval: config.protocol.peer_announce_interval,
+ peers: ResponsePeerListV4(response_peers),
+ peers6: ResponsePeerListV6(vec![]),
+ };
+
+ Response::Announce(response)
+ },
+ IpAddr::V6(peer_ip_address) => {
+ let torrent_data: &mut TorrentData = torrent_maps.ipv6
+ .entry(request.info_hash)
+ .or_default();
+
+ let peer_connection_meta = PeerConnectionMeta {
+ worker_index: request_sender_meta.worker_index,
+ poll_token: request_sender_meta.poll_token,
+ peer_ip_address
+ };
+
+ let (seeders, leechers, response_peers) = upsert_peer_and_get_response_peers(
+ config,
+ rng,
+ peer_connection_meta,
+ torrent_data,
+ request,
+ valid_until
+ );
+
+ let response = AnnounceResponse {
+ complete: seeders,
+ incomplete: leechers,
+ announce_interval: config.protocol.peer_announce_interval,
+ peers: ResponsePeerListV4(vec![]),
+ peers6: ResponsePeerListV6(response_peers),
+ };
+
+ Response::Announce(response)
+ },
+ };
(request_sender_meta, response)
}));
}
+/// Insert/update peer. Return num_seeders, num_leechers and response peers
+fn upsert_peer_and_get_response_peers(
+ config: &Config,
+ rng: &mut impl Rng,
+ request_sender_meta: PeerConnectionMeta,
+ torrent_data: &mut TorrentData
,
+ request: AnnounceRequest,
+ valid_until: ValidUntil,
+) -> (usize, usize, Vec>) {
+ // Insert/update/remove peer who sent this request
+ {
+ let peer_status = PeerStatus::from_event_and_bytes_left(
+ request.event,
+ Some(request.bytes_left)
+ );
+
+ let peer = Peer {
+ connection_meta: request_sender_meta,
+ port: request.port,
+ status: peer_status,
+ valid_until,
+ };
+
+ let ip_or_key = request.key
+ .map(Either::Right)
+ .unwrap_or_else(||
+ Either::Left(request_sender_meta.peer_ip_address)
+ );
+
+ let peer_map_key = PeerMapKey {
+ peer_id: request.peer_id,
+ ip_or_key,
+ };
+
+ let opt_removed_peer = match peer_status {
+ PeerStatus::Leeching => {
+ torrent_data.num_leechers += 1;
+
+ torrent_data.peers.insert(peer_map_key, peer)
+ },
+ PeerStatus::Seeding => {
+ torrent_data.num_seeders += 1;
+
+ torrent_data.peers.insert(peer_map_key, peer)
+ },
+ PeerStatus::Stopped => {
+ torrent_data.peers.remove(&peer_map_key)
+ }
+ };
+
+ match opt_removed_peer.map(|peer| peer.status){
+ Some(PeerStatus::Leeching) => {
+ torrent_data.num_leechers -= 1;
+ },
+ Some(PeerStatus::Seeding) => {
+ torrent_data.num_seeders -= 1;
+ },
+ _ => {}
+ }
+ }
+
+ let max_num_peers_to_take = match request.numwant {
+ Some(0) | None => config.protocol.max_peers,
+ Some(numwant) => numwant.min(config.protocol.max_peers),
+ };
+
+ let response_peers: Vec> = extract_response_peers(
+ rng,
+ &torrent_data.peers,
+ max_num_peers_to_take,
+ Peer::to_response_peer
+ );
+
+ (torrent_data.num_seeders, torrent_data.num_leechers, response_peers)
+}
+
+
pub fn handle_scrape_requests(
config: &Config,
torrent_maps: &mut TorrentMaps,
@@ -228,25 +261,38 @@ pub fn handle_scrape_requests(
files: HashMap::with_capacity(num_to_take),
};
- let torrent_map: &mut TorrentMap = if meta.peer_addr.is_ipv4(){
- &mut torrent_maps.ipv4
- } else {
- &mut torrent_maps.ipv6
- };
+ let peer_ip = convert_ipv4_mapped_ipv4(
+ meta.peer_addr.ip()
+ );
// If request.info_hashes is empty, don't return scrape for all
// torrents, even though reference server does it. It is too expensive.
- for info_hash in request.info_hashes.into_iter().take(num_to_take){
- if let Some(torrent_data) = torrent_map.get(&info_hash){
- let stats = ScrapeStatistics {
- complete: torrent_data.num_seeders,
- downloaded: 0, // No implementation planned
- incomplete: torrent_data.num_leechers,
- };
+ if peer_ip.is_ipv4(){
+ for info_hash in request.info_hashes.into_iter().take(num_to_take){
+ if let Some(torrent_data) = torrent_maps.ipv4.get(&info_hash){
+ let stats = ScrapeStatistics {
+ complete: torrent_data.num_seeders,
+ downloaded: 0, // No implementation planned
+ incomplete: torrent_data.num_leechers,
+ };
- response.files.insert(info_hash, stats);
+ response.files.insert(info_hash, stats);
+ }
}
- }
+ } else {
+ for info_hash in request.info_hashes.into_iter().take(num_to_take){
+ if let Some(torrent_data) = torrent_maps.ipv6.get(&info_hash){
+ let stats = ScrapeStatistics {
+ complete: torrent_data.num_seeders,
+ downloaded: 0, // No implementation planned
+ incomplete: torrent_data.num_leechers,
+ };
+
+ response.files.insert(info_hash, stats);
+ }
+ }
+ };
+
(meta, Response::Scrape(response))
}));
diff --git a/aquatic_http/src/lib/protocol/common.rs b/aquatic_http/src/lib/protocol/common.rs
index 25949db..782bc22 100644
--- a/aquatic_http/src/lib/protocol/common.rs
+++ b/aquatic_http/src/lib/protocol/common.rs
@@ -5,7 +5,7 @@ use serde::Serialize;
use super::utils::*;
-#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize)]
+#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Serialize)]
#[serde(transparent)]
pub struct PeerId(
#[serde(
@@ -15,7 +15,7 @@ pub struct PeerId(
);
-#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize)]
+#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Serialize)]
#[serde(transparent)]
pub struct InfoHash(
#[serde(
diff --git a/aquatic_http/src/lib/protocol/response.rs b/aquatic_http/src/lib/protocol/response.rs
index 1fb905e..fd9b217 100644
--- a/aquatic_http/src/lib/protocol/response.rs
+++ b/aquatic_http/src/lib/protocol/response.rs
@@ -1,42 +1,15 @@
-use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
+use std::net::{Ipv4Addr, Ipv6Addr};
use hashbrown::HashMap;
use serde::Serialize;
-use crate::common::Peer;
-
use super::common::*;
use super::utils::*;
-pub struct ResponsePeer {
- pub ip_address: IpAddr,
- pub port: u16
-}
-
-
-impl ResponsePeer {
- pub fn from_peer(peer: &Peer) -> Self {
- let ip_address = peer.connection_meta.peer_addr.ip();
-
- Self {
- ip_address,
- port: peer.port
- }
- }
-}
-
-
-#[derive(Clone, Copy, Debug, Serialize)]
-pub struct ResponsePeerV4 {
- pub ip_address: Ipv4Addr,
- pub port: u16
-}
-
-
-#[derive(Clone, Copy, Debug, Serialize)]
-pub struct ResponsePeerV6 {
- pub ip_address: Ipv6Addr,
+#[derive(Debug, Clone, Serialize)]
+pub struct ResponsePeer{
+ pub ip_address: I,
pub port: u16
}
@@ -45,7 +18,7 @@ pub struct ResponsePeerV6 {
#[serde(transparent)]
pub struct ResponsePeerListV4(
#[serde(serialize_with = "serialize_response_peers_ipv4")]
- pub Vec
+ pub Vec>
);
@@ -53,7 +26,7 @@ pub struct ResponsePeerListV4(
#[serde(transparent)]
pub struct ResponsePeerListV6(
#[serde(serialize_with = "serialize_response_peers_ipv6")]
- pub Vec
+ pub Vec>
);
diff --git a/aquatic_http/src/lib/protocol/utils.rs b/aquatic_http/src/lib/protocol/utils.rs
index ff681d7..eea0d86 100644
--- a/aquatic_http/src/lib/protocol/utils.rs
+++ b/aquatic_http/src/lib/protocol/utils.rs
@@ -1,6 +1,8 @@
+use std::net::{Ipv4Addr, Ipv6Addr};
+
use serde::Serializer;
-use super::response::{ResponsePeerV4, ResponsePeerV6};
+use super::response::ResponsePeer;
/// Not for serde
@@ -41,7 +43,7 @@ pub fn serialize_20_bytes(
pub fn serialize_response_peers_ipv4(
- response_peers: &[ResponsePeerV4],
+ response_peers: &[ResponsePeer],
serializer: S
) -> Result where S: Serializer {
let mut bytes = Vec::with_capacity(response_peers.len() * 6);
@@ -56,7 +58,7 @@ pub fn serialize_response_peers_ipv4(
pub fn serialize_response_peers_ipv6(
- response_peers: &[ResponsePeerV6],
+ response_peers: &[ResponsePeer],
serializer: S
) -> Result where S: Serializer {
let mut bytes = Vec::with_capacity(response_peers.len() * 6);
diff --git a/aquatic_http/src/lib/tasks.rs b/aquatic_http/src/lib/tasks.rs
index e91e5e4..ad901dc 100644
--- a/aquatic_http/src/lib/tasks.rs
+++ b/aquatic_http/src/lib/tasks.rs
@@ -4,24 +4,26 @@ use crate::common::*;
pub fn clean_torrents(state: &State){
- fn clean_torrent_map(
- torrent_map: &mut TorrentMap,
- ){
- let now = Instant::now();
-
- torrent_map.retain(|_, torrent_data| {
- torrent_data.peers.retain(|_, peer| {
- peer.valid_until.0 >= now
- });
-
- !torrent_data.peers.is_empty()
- });
-
- torrent_map.shrink_to_fit();
- }
let mut torrent_maps = state.torrent_maps.lock();
clean_torrent_map(&mut torrent_maps.ipv4);
clean_torrent_map(&mut torrent_maps.ipv6);
+}
+
+
+fn clean_torrent_map(
+ torrent_map: &mut TorrentMap,
+){
+ let now = Instant::now();
+
+ torrent_map.retain(|_, torrent_data| {
+ torrent_data.peers.retain(|_, peer| {
+ peer.valid_until.0 >= now
+ });
+
+ !torrent_data.peers.is_empty()
+ });
+
+ torrent_map.shrink_to_fit();
}
\ No newline at end of file