mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-03-31 17:55:36 +00:00
ws: allow peers to use multiple PeerIds, but only one per torrent
This commit is contained in:
parent
92794444f9
commit
e0c4a4eaf7
1 changed files with 41 additions and 39 deletions
|
|
@ -24,7 +24,8 @@ use glommio::net::{TcpListener, TcpStream};
|
||||||
use glommio::task::JoinHandle;
|
use glommio::task::JoinHandle;
|
||||||
use glommio::timer::{sleep, timeout, TimerActionRepeat};
|
use glommio::timer::{sleep, timeout, TimerActionRepeat};
|
||||||
use glommio::{enclose, prelude::*};
|
use glommio::{enclose, prelude::*};
|
||||||
use hashbrown::{HashMap, HashSet};
|
use hashbrown::hash_map::Entry;
|
||||||
|
use hashbrown::HashMap;
|
||||||
use slab::Slab;
|
use slab::Slab;
|
||||||
|
|
||||||
use crate::config::Config;
|
use crate::config::Config;
|
||||||
|
|
@ -45,8 +46,7 @@ struct ConnectionReference {
|
||||||
out_message_sender: Rc<LocalSender<(OutMessageMeta, OutMessage)>>,
|
out_message_sender: Rc<LocalSender<(OutMessageMeta, OutMessage)>>,
|
||||||
/// Updated after sending message to peer
|
/// Updated after sending message to peer
|
||||||
valid_until: ValidUntil,
|
valid_until: ValidUntil,
|
||||||
peer_id: Option<PeerId>,
|
announced_info_hashes: HashMap<InfoHash, PeerId>,
|
||||||
announced_info_hashes: HashSet<InfoHash>,
|
|
||||||
ip_version: IpVersion,
|
ip_version: IpVersion,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -145,7 +145,6 @@ pub async fn run_socket_worker(
|
||||||
server_start_instant,
|
server_start_instant,
|
||||||
config.cleaning.max_connection_idle,
|
config.cleaning.max_connection_idle,
|
||||||
),
|
),
|
||||||
peer_id: None,
|
|
||||||
announced_info_hashes: Default::default(),
|
announced_info_hashes: Default::default(),
|
||||||
ip_version,
|
ip_version,
|
||||||
});
|
});
|
||||||
|
|
@ -180,26 +179,24 @@ pub async fn run_socket_worker(
|
||||||
|
|
||||||
// Tell swarm workers to remove peer
|
// Tell swarm workers to remove peer
|
||||||
if let Some(reference) = opt_reference {
|
if let Some(reference) = opt_reference {
|
||||||
if let Some(peer_id) = reference.peer_id {
|
for (info_hash, peer_id) in reference.announced_info_hashes {
|
||||||
for info_hash in reference.announced_info_hashes {
|
let message = SwarmControlMessage::ConnectionClosed {
|
||||||
let message = SwarmControlMessage::ConnectionClosed {
|
info_hash,
|
||||||
info_hash,
|
peer_id,
|
||||||
peer_id,
|
ip_version: reference.ip_version,
|
||||||
ip_version: reference.ip_version,
|
};
|
||||||
};
|
|
||||||
|
|
||||||
let consumer_index =
|
let consumer_index =
|
||||||
calculate_in_message_consumer_index(&config, info_hash);
|
calculate_in_message_consumer_index(&config, info_hash);
|
||||||
|
|
||||||
// Only fails when receiver is closed
|
// Only fails when receiver is closed
|
||||||
control_message_senders
|
control_message_senders
|
||||||
.send_to(
|
.send_to(
|
||||||
consumer_index,
|
consumer_index,
|
||||||
message
|
message
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}), tq_regular)
|
}), tq_regular)
|
||||||
|
|
@ -499,26 +496,31 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionReader<S> {
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
// Store peer id / check if stored peer id matches
|
// Store peer id / check if stored peer id matches
|
||||||
match &mut connection_reference.peer_id {
|
match connection_reference
|
||||||
Some(peer_id) if *peer_id != announce_request.peer_id => {
|
.announced_info_hashes
|
||||||
self.send_error_response(
|
.entry(announce_request.info_hash)
|
||||||
"Only one peer id can be used per connection".into(),
|
{
|
||||||
Some(ErrorResponseAction::Announce),
|
Entry::Occupied(entry) => {
|
||||||
Some(info_hash),
|
if *entry.get() != announce_request.peer_id {
|
||||||
)
|
// Drop Rc borrow before awaiting
|
||||||
.await?;
|
drop(connection_slab);
|
||||||
return Err(anyhow::anyhow!("Peer used more than one PeerId"));
|
|
||||||
|
self.send_error_response(
|
||||||
|
"Only one peer id can be used per torrent".into(),
|
||||||
|
Some(ErrorResponseAction::Announce),
|
||||||
|
Some(info_hash),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
return Err(anyhow::anyhow!(
|
||||||
|
"Peer used more than one PeerId for a single torrent"
|
||||||
|
));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Some(_) => (),
|
Entry::Vacant(entry) => {
|
||||||
opt_peer_id @ None => {
|
entry.insert(announce_request.peer_id);
|
||||||
*opt_peer_id = Some(announce_request.peer_id);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remember info hash for later
|
|
||||||
connection_reference
|
|
||||||
.announced_info_hashes
|
|
||||||
.insert(announce_request.info_hash);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let in_message = InMessage::AnnounceRequest(announce_request);
|
let in_message = InMessage::AnnounceRequest(announce_request);
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue