mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-03-31 17:55:36 +00:00
ws: improve socket worker connection code
This commit is contained in:
parent
fe5ccf6646
commit
7b2a7a4f46
1 changed files with 229 additions and 242 deletions
|
|
@ -229,11 +229,14 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionReader<S> {
|
|||
match &message {
|
||||
tungstenite::Message::Text(_) | tungstenite::Message::Binary(_) => {
|
||||
match InMessage::from_ws_message(message) {
|
||||
Ok(in_message) => {
|
||||
self.handle_in_message(in_message).await?;
|
||||
Ok(InMessage::AnnounceRequest(request)) => {
|
||||
self.handle_announce_request(request).await?;
|
||||
}
|
||||
Ok(InMessage::ScrapeRequest(request)) => {
|
||||
self.handle_scrape_request(request).await?;
|
||||
}
|
||||
Err(err) => {
|
||||
::log::debug!("Couldn't parse in_message: {:?}", err);
|
||||
::log::debug!("Couldn't parse in_message: {:#}", err);
|
||||
|
||||
self.send_error_response("Invalid request".into(), None, None)
|
||||
.await?;
|
||||
|
|
@ -261,171 +264,167 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionReader<S> {
|
|||
}
|
||||
}
|
||||
|
||||
async fn handle_in_message(&mut self, in_message: InMessage) -> anyhow::Result<()> {
|
||||
match in_message {
|
||||
InMessage::AnnounceRequest(announce_request) => {
|
||||
#[cfg(feature = "metrics")]
|
||||
::metrics::increment_counter!(
|
||||
"aquatic_requests_total",
|
||||
"type" => "announce",
|
||||
"ip_version" => ip_version_to_metrics_str(self.ip_version),
|
||||
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
|
||||
);
|
||||
async fn handle_announce_request(&mut self, request: AnnounceRequest) -> anyhow::Result<()> {
|
||||
#[cfg(feature = "metrics")]
|
||||
::metrics::increment_counter!(
|
||||
"aquatic_requests_total",
|
||||
"type" => "announce",
|
||||
"ip_version" => ip_version_to_metrics_str(self.ip_version),
|
||||
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
|
||||
);
|
||||
|
||||
let info_hash = announce_request.info_hash;
|
||||
let info_hash = request.info_hash;
|
||||
|
||||
if self
|
||||
.access_list_cache
|
||||
.load()
|
||||
.allows(self.config.access_list.mode, &info_hash.0)
|
||||
{
|
||||
let mut announced_info_hashes =
|
||||
self.clean_up_data.announced_info_hashes.borrow_mut();
|
||||
if self
|
||||
.access_list_cache
|
||||
.load()
|
||||
.allows(self.config.access_list.mode, &info_hash.0)
|
||||
{
|
||||
let mut announced_info_hashes = self.clean_up_data.announced_info_hashes.borrow_mut();
|
||||
|
||||
// Store peer id / check if stored peer id matches
|
||||
match announced_info_hashes.entry(announce_request.info_hash) {
|
||||
Entry::Occupied(entry) => {
|
||||
if *entry.get() != announce_request.peer_id {
|
||||
// Drop Rc borrow before awaiting
|
||||
drop(announced_info_hashes);
|
||||
// Store peer id / check if stored peer id matches
|
||||
match announced_info_hashes.entry(request.info_hash) {
|
||||
Entry::Occupied(entry) => {
|
||||
if *entry.get() != request.peer_id {
|
||||
// Drop Rc borrow before awaiting
|
||||
drop(announced_info_hashes);
|
||||
|
||||
self.send_error_response(
|
||||
"Only one peer id can be used per torrent".into(),
|
||||
Some(ErrorResponseAction::Announce),
|
||||
Some(info_hash),
|
||||
)
|
||||
.await?;
|
||||
|
||||
return Err(anyhow::anyhow!(
|
||||
"Peer used more than one PeerId for a single torrent"
|
||||
));
|
||||
}
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(announce_request.peer_id);
|
||||
|
||||
// Set peer client info if not set
|
||||
#[cfg(feature = "metrics")]
|
||||
if self.config.metrics.run_prometheus_endpoint
|
||||
&& self.config.metrics.peer_clients
|
||||
&& self.clean_up_data.opt_peer_client.borrow().is_none()
|
||||
{
|
||||
let peer_id = aquatic_peer_id::PeerId(announce_request.peer_id.0);
|
||||
let client = peer_id.client();
|
||||
let prefix = peer_id.first_8_bytes_hex().to_string();
|
||||
|
||||
::metrics::increment_gauge!(
|
||||
"aquatic_peer_clients",
|
||||
1.0,
|
||||
"client" => client.to_string(),
|
||||
);
|
||||
|
||||
if self.config.metrics.peer_id_prefixes {
|
||||
::metrics::increment_gauge!(
|
||||
"aquatic_peer_id_prefixes",
|
||||
1.0,
|
||||
"prefix_hex" => prefix.to_string(),
|
||||
);
|
||||
}
|
||||
|
||||
*self.clean_up_data.opt_peer_client.borrow_mut() =
|
||||
Some((client, prefix));
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(AnnounceEvent::Stopped) = announce_request.event {
|
||||
announced_info_hashes.remove(&announce_request.info_hash);
|
||||
}
|
||||
|
||||
// Drop Rc borrow before awaiting
|
||||
drop(announced_info_hashes);
|
||||
|
||||
let in_message = InMessage::AnnounceRequest(announce_request);
|
||||
|
||||
let consumer_index =
|
||||
calculate_in_message_consumer_index(&self.config, info_hash);
|
||||
|
||||
// Only fails when receiver is closed
|
||||
self.in_message_senders
|
||||
.send_to(
|
||||
consumer_index,
|
||||
(self.make_connection_meta(None), in_message),
|
||||
self.send_error_response(
|
||||
"Only one peer id can be used per torrent".into(),
|
||||
Some(ErrorResponseAction::Announce),
|
||||
Some(info_hash),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
} else {
|
||||
self.send_error_response(
|
||||
"Info hash not allowed".into(),
|
||||
Some(ErrorResponseAction::Announce),
|
||||
Some(info_hash),
|
||||
)
|
||||
.await?;
|
||||
.await?;
|
||||
|
||||
return Err(anyhow::anyhow!(
|
||||
"Peer used more than one PeerId for a single torrent"
|
||||
));
|
||||
}
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(request.peer_id);
|
||||
|
||||
// Set peer client info if not set
|
||||
#[cfg(feature = "metrics")]
|
||||
if self.config.metrics.run_prometheus_endpoint
|
||||
&& self.config.metrics.peer_clients
|
||||
&& self.clean_up_data.opt_peer_client.borrow().is_none()
|
||||
{
|
||||
let peer_id = aquatic_peer_id::PeerId(request.peer_id.0);
|
||||
let client = peer_id.client();
|
||||
let prefix = peer_id.first_8_bytes_hex().to_string();
|
||||
|
||||
::metrics::increment_gauge!(
|
||||
"aquatic_peer_clients",
|
||||
1.0,
|
||||
"client" => client.to_string(),
|
||||
);
|
||||
|
||||
if self.config.metrics.peer_id_prefixes {
|
||||
::metrics::increment_gauge!(
|
||||
"aquatic_peer_id_prefixes",
|
||||
1.0,
|
||||
"prefix_hex" => prefix.to_string(),
|
||||
);
|
||||
}
|
||||
|
||||
*self.clean_up_data.opt_peer_client.borrow_mut() = Some((client, prefix));
|
||||
};
|
||||
}
|
||||
}
|
||||
InMessage::ScrapeRequest(ScrapeRequest { info_hashes, .. }) => {
|
||||
#[cfg(feature = "metrics")]
|
||||
::metrics::increment_counter!(
|
||||
"aquatic_requests_total",
|
||||
"type" => "scrape",
|
||||
"ip_version" => ip_version_to_metrics_str(self.ip_version),
|
||||
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
|
||||
);
|
||||
|
||||
let info_hashes = if let Some(info_hashes) = info_hashes {
|
||||
info_hashes
|
||||
} else {
|
||||
// If request.info_hashes is empty, don't return scrape for all
|
||||
// torrents, even though reference server does it. It is too expensive.
|
||||
self.send_error_response(
|
||||
"Full scrapes are not allowed".into(),
|
||||
Some(ErrorResponseAction::Scrape),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let mut info_hashes_by_worker: BTreeMap<usize, Vec<InfoHash>> = BTreeMap::new();
|
||||
|
||||
for info_hash in info_hashes.as_vec() {
|
||||
let info_hashes = info_hashes_by_worker
|
||||
.entry(calculate_in_message_consumer_index(&self.config, info_hash))
|
||||
.or_default();
|
||||
|
||||
info_hashes.push(info_hash);
|
||||
}
|
||||
|
||||
let pending_worker_out_messages = info_hashes_by_worker.len();
|
||||
|
||||
let pending_scrape_response = PendingScrapeResponse {
|
||||
pending_worker_out_messages,
|
||||
stats: Default::default(),
|
||||
};
|
||||
|
||||
let pending_scrape_id: u8 = self
|
||||
.pending_scrape_slab
|
||||
.borrow_mut()
|
||||
.insert(pending_scrape_response)
|
||||
.try_into()
|
||||
.with_context(|| "Reached 256 pending scrape responses")?;
|
||||
|
||||
let meta = self.make_connection_meta(Some(PendingScrapeId(pending_scrape_id)));
|
||||
|
||||
for (consumer_index, info_hashes) in info_hashes_by_worker {
|
||||
let in_message = InMessage::ScrapeRequest(ScrapeRequest {
|
||||
action: ScrapeAction::Scrape,
|
||||
info_hashes: Some(ScrapeRequestInfoHashes::Multiple(info_hashes)),
|
||||
});
|
||||
|
||||
// Only fails when receiver is closed
|
||||
self.in_message_senders
|
||||
.send_to(consumer_index, (meta, in_message))
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
if let Some(AnnounceEvent::Stopped) = request.event {
|
||||
announced_info_hashes.remove(&request.info_hash);
|
||||
}
|
||||
|
||||
// Drop Rc borrow before awaiting
|
||||
drop(announced_info_hashes);
|
||||
|
||||
let in_message = InMessage::AnnounceRequest(request);
|
||||
|
||||
let consumer_index = calculate_in_message_consumer_index(&self.config, info_hash);
|
||||
|
||||
// Only fails when receiver is closed
|
||||
self.in_message_senders
|
||||
.send_to(
|
||||
consumer_index,
|
||||
(self.make_connection_meta(None), in_message),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
} else {
|
||||
self.send_error_response(
|
||||
"Info hash not allowed".into(),
|
||||
Some(ErrorResponseAction::Announce),
|
||||
Some(info_hash),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_scrape_request(&mut self, request: ScrapeRequest) -> anyhow::Result<()> {
|
||||
#[cfg(feature = "metrics")]
|
||||
::metrics::increment_counter!(
|
||||
"aquatic_requests_total",
|
||||
"type" => "scrape",
|
||||
"ip_version" => ip_version_to_metrics_str(self.ip_version),
|
||||
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
|
||||
);
|
||||
|
||||
let info_hashes = if let Some(info_hashes) = request.info_hashes {
|
||||
info_hashes
|
||||
} else {
|
||||
// If request.info_hashes is empty, don't return scrape for all
|
||||
// torrents, even though reference server does it. It is too expensive.
|
||||
self.send_error_response(
|
||||
"Full scrapes are not allowed".into(),
|
||||
Some(ErrorResponseAction::Scrape),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let mut info_hashes_by_worker: BTreeMap<usize, Vec<InfoHash>> = BTreeMap::new();
|
||||
|
||||
for info_hash in info_hashes.as_vec() {
|
||||
let info_hashes = info_hashes_by_worker
|
||||
.entry(calculate_in_message_consumer_index(&self.config, info_hash))
|
||||
.or_default();
|
||||
|
||||
info_hashes.push(info_hash);
|
||||
}
|
||||
|
||||
let pending_worker_out_messages = info_hashes_by_worker.len();
|
||||
|
||||
let pending_scrape_response = PendingScrapeResponse {
|
||||
pending_worker_out_messages,
|
||||
stats: Default::default(),
|
||||
};
|
||||
|
||||
let pending_scrape_id: u8 = self
|
||||
.pending_scrape_slab
|
||||
.borrow_mut()
|
||||
.insert(pending_scrape_response)
|
||||
.try_into()
|
||||
.with_context(|| "Reached 256 pending scrape responses")?;
|
||||
|
||||
let meta = self.make_connection_meta(Some(PendingScrapeId(pending_scrape_id)));
|
||||
|
||||
for (consumer_index, info_hashes) in info_hashes_by_worker {
|
||||
let in_message = InMessage::ScrapeRequest(ScrapeRequest {
|
||||
action: ScrapeAction::Scrape,
|
||||
info_hashes: Some(ScrapeRequestInfoHashes::Multiple(info_hashes)),
|
||||
});
|
||||
|
||||
// Only fails when receiver is closed
|
||||
self.in_message_senders
|
||||
.send_to(consumer_index, (meta, in_message))
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
|
@ -485,31 +484,28 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionWriter<S> {
|
|||
.pending_scrape_id
|
||||
.expect("meta.pending_scrape_id not set");
|
||||
|
||||
let finished = if let Some(pending) = Slab::get_mut(
|
||||
&mut RefCell::borrow_mut(&self.pending_scrape_slab),
|
||||
pending_scrape_id.0 as usize,
|
||||
) {
|
||||
pending.stats.extend(out_message.files);
|
||||
pending.pending_worker_out_messages -= 1;
|
||||
let mut pending_responses = self.pending_scrape_slab.borrow_mut();
|
||||
|
||||
pending.pending_worker_out_messages == 0
|
||||
} else {
|
||||
return Err(anyhow::anyhow!("pending scrape not found in slab"));
|
||||
};
|
||||
let pending_response = pending_responses
|
||||
.get_mut(pending_scrape_id.0 as usize)
|
||||
.ok_or(anyhow::anyhow!("pending scrape not found in slab"))?;
|
||||
|
||||
if finished {
|
||||
let out_message = {
|
||||
let mut slab = RefCell::borrow_mut(&self.pending_scrape_slab);
|
||||
pending_response.stats.extend(out_message.files);
|
||||
pending_response.pending_worker_out_messages -= 1;
|
||||
|
||||
let pending = slab.remove(pending_scrape_id.0 as usize);
|
||||
if pending_response.pending_worker_out_messages == 0 {
|
||||
let pending_response =
|
||||
pending_responses.remove(pending_scrape_id.0 as usize);
|
||||
|
||||
slab.shrink_to_fit();
|
||||
pending_responses.shrink_to_fit();
|
||||
|
||||
OutMessage::ScrapeResponse(ScrapeResponse {
|
||||
action: ScrapeAction::Scrape,
|
||||
files: pending.stats,
|
||||
})
|
||||
};
|
||||
let out_message = OutMessage::ScrapeResponse(ScrapeResponse {
|
||||
action: ScrapeAction::Scrape,
|
||||
files: pending_response.stats,
|
||||
});
|
||||
|
||||
// Drop Rc borrow before awaiting
|
||||
drop(pending_responses);
|
||||
|
||||
self.send_out_message(&out_message).await?;
|
||||
}
|
||||
|
|
@ -522,72 +518,63 @@ impl<S: futures::AsyncRead + futures::AsyncWrite + Unpin> ConnectionWriter<S> {
|
|||
}
|
||||
|
||||
async fn send_out_message(&mut self, out_message: &OutMessage) -> anyhow::Result<()> {
|
||||
let result = timeout(Duration::from_secs(10), async {
|
||||
let result =
|
||||
futures::SinkExt::send(&mut self.ws_out, out_message.to_ws_message()).await;
|
||||
|
||||
Ok(result)
|
||||
timeout(Duration::from_secs(10), async {
|
||||
Ok(futures::SinkExt::send(&mut self.ws_out, out_message.to_ws_message()).await)
|
||||
})
|
||||
.await;
|
||||
.await
|
||||
.map_err(|err| {
|
||||
anyhow::anyhow!("send_out_message: sending to peer took too long: {:#}", err)
|
||||
})?
|
||||
.with_context(|| "send_out_message")?;
|
||||
|
||||
match result {
|
||||
Ok(Ok(())) => {
|
||||
if let OutMessage::AnnounceResponse(_) | OutMessage::ScrapeResponse(_) = out_message
|
||||
{
|
||||
*self.connection_valid_until.borrow_mut() = ValidUntil::new(
|
||||
self.server_start_instant,
|
||||
self.config.cleaning.max_connection_idle,
|
||||
);
|
||||
}
|
||||
|
||||
#[cfg(feature = "metrics")]
|
||||
{
|
||||
let out_message_type = match &out_message {
|
||||
OutMessage::OfferOutMessage(_) => "offer",
|
||||
OutMessage::AnswerOutMessage(_) => "offer_answer",
|
||||
OutMessage::AnnounceResponse(_) => "announce",
|
||||
OutMessage::ScrapeResponse(_) => "scrape",
|
||||
OutMessage::ErrorResponse(_) => "error",
|
||||
};
|
||||
|
||||
::metrics::increment_counter!(
|
||||
"aquatic_responses_total",
|
||||
"type" => out_message_type,
|
||||
"ip_version" => ip_version_to_metrics_str(self.ip_version),
|
||||
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
|
||||
);
|
||||
|
||||
if let Some((peer_client, prefix)) =
|
||||
self.clean_up_data.opt_peer_client.borrow().as_ref()
|
||||
{
|
||||
// As long as connection is still alive, increment peer client
|
||||
// gauges by zero to prevent them from being removed due to
|
||||
// idleness
|
||||
|
||||
::metrics::increment_gauge!(
|
||||
"aquatic_peer_clients",
|
||||
0.0,
|
||||
"client" => peer_client.to_string(),
|
||||
);
|
||||
|
||||
if self.config.metrics.peer_id_prefixes {
|
||||
::metrics::increment_gauge!(
|
||||
"aquatic_peer_id_prefixes",
|
||||
0.0,
|
||||
"prefix_hex" => prefix.to_string(),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Ok(Err(err)) => Err(err.into()),
|
||||
Err(err) => Err(anyhow::anyhow!(
|
||||
"send_out_message: sending to peer took too long: {}",
|
||||
err
|
||||
)),
|
||||
if let OutMessage::AnnounceResponse(_) | OutMessage::ScrapeResponse(_) = out_message {
|
||||
*self.connection_valid_until.borrow_mut() = ValidUntil::new(
|
||||
self.server_start_instant,
|
||||
self.config.cleaning.max_connection_idle,
|
||||
);
|
||||
}
|
||||
|
||||
#[cfg(feature = "metrics")]
|
||||
{
|
||||
let out_message_type = match &out_message {
|
||||
OutMessage::OfferOutMessage(_) => "offer",
|
||||
OutMessage::AnswerOutMessage(_) => "offer_answer",
|
||||
OutMessage::AnnounceResponse(_) => "announce",
|
||||
OutMessage::ScrapeResponse(_) => "scrape",
|
||||
OutMessage::ErrorResponse(_) => "error",
|
||||
};
|
||||
|
||||
::metrics::increment_counter!(
|
||||
"aquatic_responses_total",
|
||||
"type" => out_message_type,
|
||||
"ip_version" => ip_version_to_metrics_str(self.ip_version),
|
||||
"worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(),
|
||||
);
|
||||
|
||||
if let Some((peer_client, prefix)) =
|
||||
self.clean_up_data.opt_peer_client.borrow().as_ref()
|
||||
{
|
||||
// As long as connection is still alive, increment peer client
|
||||
// gauges by zero to prevent them from being removed due to
|
||||
// idleness
|
||||
|
||||
::metrics::increment_gauge!(
|
||||
"aquatic_peer_clients",
|
||||
0.0,
|
||||
"client" => peer_client.to_string(),
|
||||
);
|
||||
|
||||
if self.config.metrics.peer_id_prefixes {
|
||||
::metrics::increment_gauge!(
|
||||
"aquatic_peer_id_prefixes",
|
||||
0.0,
|
||||
"prefix_hex" => prefix.to_string(),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue