ws: store peer_id and announced info hashes in ConnectionReference

This commit is contained in:
Joakim Frostegård 2022-07-05 12:03:51 +02:00
parent b06c12e9a5
commit b30da1a930

View file

@ -25,7 +25,7 @@ use glommio::net::{TcpListener, TcpStream};
use glommio::task::JoinHandle;
use glommio::timer::{sleep, timeout, TimerActionRepeat};
use glommio::{enclose, prelude::*};
use hashbrown::HashMap;
use hashbrown::{HashMap, HashSet};
use slab::Slab;
use crate::config::Config;
@ -46,6 +46,8 @@ struct ConnectionReference {
out_message_sender: Rc<LocalSender<(ConnectionMeta, OutMessage)>>,
/// Updated after sending message to peer
valid_until: ValidUntil,
peer_id: Option<PeerId>,
announced_info_hashes: HashSet<InfoHash>,
}
pub async fn run_socket_worker(
@ -112,6 +114,8 @@ pub async fn run_socket_worker(
task_handle: None,
out_message_sender: out_message_sender.clone(),
valid_until: ValidUntil::new(config.cleaning.max_connection_idle),
peer_id: None,
announced_info_hashes: Default::default(),
});
::log::info!("accepting stream: {}", key);
@ -240,10 +244,11 @@ async fn run_connection(
let access_list_cache = create_access_list_cache(&access_list);
let reader_handle = spawn_local_into(
enclose!((config, pending_scrape_slab) async move {
enclose!((config, connection_slab, pending_scrape_slab) async move {
let mut reader = ConnectionReader {
config,
access_list_cache,
connection_slab,
in_message_senders,
out_message_sender,
pending_scrape_slab,
@ -289,6 +294,7 @@ async fn run_connection(
struct ConnectionReader {
config: Rc<Config>,
access_list_cache: AccessListCache,
connection_slab: Rc<RefCell<Slab<ConnectionReference>>>,
in_message_senders: Rc<Senders<(ConnectionMeta, InMessage)>>,
out_message_sender: Rc<LocalSender<(ConnectionMeta, OutMessage)>>,
pending_scrape_slab: Rc<RefCell<Slab<PendingScrapeResponse>>>,
@ -320,7 +326,7 @@ impl ConnectionReader {
Err(err) => {
::log::debug!("Couldn't parse in_message: {:?}", err);
self.send_error_response("Invalid request".into(), None)
self.send_error_response("Invalid request".into(), None, None)
.await?;
}
}
@ -339,6 +345,39 @@ impl ConnectionReader {
.load()
.allows(self.config.access_list.mode, &info_hash.0)
{
{
let mut connection_slab = self.connection_slab.borrow_mut();
let connection_reference = connection_slab
.get_mut(self.connection_id.0)
.ok_or_else(|| {
anyhow::anyhow!(
"connection reference {} not found in slab",
self.connection_id.0
)
})?;
match &mut connection_reference.peer_id {
Some(peer_id) if *peer_id != announce_request.peer_id => {
self.send_error_response(
"Only one peer id can be used per connection".into(),
Some(ErrorResponseAction::Announce),
Some(info_hash),
)
.await?;
return Err(anyhow::anyhow!("Peer used more than one PeerId"));
}
Some(_) => (),
opt_peer_id @ None => {
*opt_peer_id = Some(announce_request.peer_id);
}
}
connection_reference
.announced_info_hashes
.insert(announce_request.info_hash);
}
let in_message = InMessage::AnnounceRequest(announce_request);
let consumer_index =
@ -354,7 +393,11 @@ impl ConnectionReader {
.unwrap();
::log::info!("sent message to swarm worker");
} else {
self.send_error_response("Info hash not allowed".into(), Some(info_hash))
self.send_error_response(
"Info hash not allowed".into(),
Some(ErrorResponseAction::Announce),
Some(info_hash),
)
.await?;
}
}
@ -364,7 +407,11 @@ impl ConnectionReader {
} else {
// If request.info_hashes is empty, don't return scrape for all
// torrents, even though reference server does it. It is too expensive.
self.send_error_response("Full scrapes are not allowed".into(), None)
self.send_error_response(
"Full scrapes are not allowed".into(),
Some(ErrorResponseAction::Scrape),
None,
)
.await?;
return Ok(());
@ -415,10 +462,11 @@ impl ConnectionReader {
async fn send_error_response(
&self,
failure_reason: Cow<'static, str>,
action: Option<ErrorResponseAction>,
info_hash: Option<InfoHash>,
) -> anyhow::Result<()> {
let out_message = OutMessage::ErrorResponse(ErrorResponse {
action: Some(ErrorResponseAction::Scrape),
action,
failure_reason,
info_hash,
});