udp: index peers by packet src ip and provided port instead of peer_id

This commit is contained in:
Joakim Frostegård 2024-01-04 17:04:17 +01:00
parent df13ae9399
commit 0eaa4475e2
4 changed files with 27 additions and 27 deletions

View file

@ -18,6 +18,9 @@
#### Changed #### Changed
* Index peers by packet source IP and provided port, instead of by peer_id.
This prevents users from impersonating others and is likely also slightly
faster for IPv4 peers.
* Remove support for unbounded worker channels * Remove support for unbounded worker channels
* Add backpressure in socket workers. They will postpone reading from the * Add backpressure in socket workers. They will postpone reading from the
socket if sending a request to a swarm worker failed socket if sending a request to a swarm worker failed

View file

@ -3,8 +3,12 @@
## High priority ## High priority
* aquatic_bench * aquatic_bench
* Check if opentracker is slow to get up to speed, adjust bencher * Opentracker "slow to get up to speed", is it due to getting faster once
* Maybe investigate aquatic memory use inserts are rarely needed since most ip-port combinations have been sent?
In that case, a shorter duration (e.g., 30 seconds) would be a good idea.
* Maybe investigate aquatic memory use.
* Would it use significantly less memory to store peers in an ArrayVec if
there are only, say, 2 of them?
* CI transfer test * CI transfer test
* add HTTP without TLS * add HTTP without TLS

View file

@ -157,7 +157,7 @@ impl<I: Ip> TorrentMap<I> {
} }
pub struct TorrentData<I: Ip> { pub struct TorrentData<I: Ip> {
peers: IndexMap<PeerId, Peer<I>>, peers: IndexMap<ResponsePeer<I>, Peer>,
num_seeders: usize, num_seeders: usize,
} }
@ -184,7 +184,12 @@ impl<I: Ip> TorrentData<I> {
let status = let status =
PeerStatus::from_event_and_bytes_left(request.event.into(), request.bytes_left); PeerStatus::from_event_and_bytes_left(request.event.into(), request.bytes_left);
let opt_removed_peer = self.peers.remove(&request.peer_id); let peer_map_key = ResponsePeer {
ip_address,
port: request.port,
};
let opt_removed_peer = self.peers.remove(&peer_map_key);
if let Some(Peer { if let Some(Peer {
is_seeder: true, .. is_seeder: true, ..
@ -208,20 +213,19 @@ impl<I: Ip> TorrentData<I> {
rng, rng,
&self.peers, &self.peers,
max_num_peers_to_take, max_num_peers_to_take,
Peer::to_response_peer, |k, _| *k,
&mut response.peers, &mut response.peers,
); );
match status { match status {
PeerStatus::Leeching => { PeerStatus::Leeching => {
let peer = Peer { let peer = Peer {
ip_address, peer_id: request.peer_id,
port: request.port,
is_seeder: false, is_seeder: false,
valid_until, valid_until,
}; };
self.peers.insert(request.peer_id, peer); self.peers.insert(peer_map_key, peer);
if config.statistics.peer_clients && opt_removed_peer.is_none() { if config.statistics.peer_clients && opt_removed_peer.is_none() {
statistics_sender statistics_sender
@ -231,13 +235,12 @@ impl<I: Ip> TorrentData<I> {
} }
PeerStatus::Seeding => { PeerStatus::Seeding => {
let peer = Peer { let peer = Peer {
ip_address, peer_id: request.peer_id,
port: request.port,
is_seeder: true, is_seeder: true,
valid_until, valid_until,
}; };
self.peers.insert(request.peer_id, peer); self.peers.insert(peer_map_key, peer);
self.num_seeders += 1; self.num_seeders += 1;
@ -279,7 +282,7 @@ impl<I: Ip> TorrentData<I> {
statistics_sender: &Sender<StatisticsMessage>, statistics_sender: &Sender<StatisticsMessage>,
now: SecondsSinceServerStart, now: SecondsSinceServerStart,
) { ) {
self.peers.retain(|peer_id, peer| { self.peers.retain(|_, peer| {
let keep = peer.valid_until.valid(now); let keep = peer.valid_until.valid(now);
if !keep { if !keep {
@ -288,7 +291,7 @@ impl<I: Ip> TorrentData<I> {
} }
if config.statistics.peer_clients { if config.statistics.peer_clients {
if let Err(_) = if let Err(_) =
statistics_sender.try_send(StatisticsMessage::PeerRemoved(*peer_id)) statistics_sender.try_send(StatisticsMessage::PeerRemoved(peer.peer_id))
{ {
// Should never happen in practice // Should never happen in practice
::log::error!("Couldn't send StatisticsMessage::PeerRemoved"); ::log::error!("Couldn't send StatisticsMessage::PeerRemoved");
@ -315,22 +318,12 @@ impl<I: Ip> Default for TorrentData<I> {
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
struct Peer<I: Ip> { struct Peer {
ip_address: I, peer_id: PeerId,
port: Port,
is_seeder: bool, is_seeder: bool,
valid_until: ValidUntil, valid_until: ValidUntil,
} }
impl<I: Ip> Peer<I> {
fn to_response_peer(_: &PeerId, peer: &Self) -> ResponsePeer<I> {
ResponsePeer {
ip_address: peer.ip_address,
port: peer.port,
}
}
}
/// Extract response peers /// Extract response peers
/// ///
/// If there are more peers in map than `max_num_peers_to_take`, do a random /// If there are more peers in map than `max_num_peers_to_take`, do a random

View file

@ -5,7 +5,7 @@ pub use aquatic_peer_id::{PeerClient, PeerId};
use zerocopy::network_endian::{I32, I64, U16, U32}; use zerocopy::network_endian::{I32, I64, U16, U32};
use zerocopy::{AsBytes, FromBytes, FromZeroes}; use zerocopy::{AsBytes, FromBytes, FromZeroes};
pub trait Ip: Clone + Copy + Debug + PartialEq + Eq + AsBytes {} pub trait Ip: Clone + Copy + Debug + PartialEq + Eq + std::hash::Hash + AsBytes {}
#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug, AsBytes, FromBytes, FromZeroes)] #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug, AsBytes, FromBytes, FromZeroes)]
#[repr(transparent)] #[repr(transparent)]
@ -91,7 +91,7 @@ impl PeerKey {
} }
} }
#[derive(PartialEq, Eq, Clone, Copy, Debug, AsBytes, FromBytes, FromZeroes)] #[derive(PartialEq, Eq, Clone, Copy, Debug, Hash, AsBytes, FromBytes, FromZeroes)]
#[repr(C, packed)] #[repr(C, packed)]
pub struct ResponsePeer<I: Ip> { pub struct ResponsePeer<I: Ip> {
pub ip_address: I, pub ip_address: I,