mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-03-31 17:55:36 +00:00
http_private: more work on announce route and channels
This commit is contained in:
parent
7d3ff2e9e6
commit
b0f89edd30
3 changed files with 52 additions and 23 deletions
|
|
@ -5,7 +5,7 @@ use tokio::sync::oneshot::Sender;
|
||||||
use super::socket::db::ValidatedAnnounceRequest;
|
use super::socket::db::ValidatedAnnounceRequest;
|
||||||
|
|
||||||
pub struct ChannelAnnounceRequest {
|
pub struct ChannelAnnounceRequest {
|
||||||
request: ValidatedAnnounceRequest,
|
pub request: ValidatedAnnounceRequest,
|
||||||
source_addr: CanonicalSocketAddr,
|
pub source_addr: CanonicalSocketAddr,
|
||||||
response_sender: Sender<AnnounceResponse>,
|
pub response_sender: Sender<AnnounceResponse>,
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,8 @@
|
||||||
use std::net::{IpAddr, SocketAddr};
|
use std::net::{IpAddr, SocketAddr};
|
||||||
|
|
||||||
use aquatic_http_protocol::{common::AnnounceEvent, request::AnnounceRequest, response::FailureResponse};
|
use aquatic_http_protocol::{
|
||||||
|
common::AnnounceEvent, request::AnnounceRequest, response::FailureResponse,
|
||||||
|
};
|
||||||
use sqlx::{Executor, MySql, Pool};
|
use sqlx::{Executor, MySql, Pool};
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
|
@ -62,19 +64,19 @@ pub async fn validate_announce_request(
|
||||||
user_token: String,
|
user_token: String,
|
||||||
request: AnnounceRequest,
|
request: AnnounceRequest,
|
||||||
) -> Result<ValidatedAnnounceRequest, FailureResponse> {
|
) -> Result<ValidatedAnnounceRequest, FailureResponse> {
|
||||||
let parameters = AnnounceProcedureParameters::new(
|
let parameters =
|
||||||
source_addr,
|
AnnounceProcedureParameters::new(source_addr, user_agent, user_token, &request);
|
||||||
user_agent,
|
|
||||||
user_token,
|
|
||||||
&request,
|
|
||||||
);
|
|
||||||
|
|
||||||
match call_announce_procedure(pool, parameters).await {
|
match call_announce_procedure(pool, parameters).await {
|
||||||
Ok(results) => {
|
Ok(results) => {
|
||||||
if results.announce_allowed {
|
if results.announce_allowed {
|
||||||
Ok(ValidatedAnnounceRequest(request))
|
Ok(ValidatedAnnounceRequest(request))
|
||||||
} else {
|
} else {
|
||||||
Err(FailureResponse::new(results.failure_reason.unwrap_or_else(|| "Not allowed".into())))
|
Err(FailureResponse::new(
|
||||||
|
results
|
||||||
|
.failure_reason
|
||||||
|
.unwrap_or_else(|| "Not allowed".into()),
|
||||||
|
))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
|
|
|
||||||
|
|
@ -1,13 +1,21 @@
|
||||||
|
use aquatic_common::CanonicalSocketAddr;
|
||||||
use axum::{
|
use axum::{
|
||||||
extract::{ConnectInfo, Path, RawQuery},
|
extract::{ConnectInfo, Path, RawQuery},
|
||||||
headers::UserAgent,
|
headers::UserAgent,
|
||||||
http::StatusCode,
|
http::StatusCode,
|
||||||
|
response::IntoResponse,
|
||||||
Extension, TypedHeader,
|
Extension, TypedHeader,
|
||||||
};
|
};
|
||||||
use sqlx::mysql::MySqlPool;
|
use sqlx::mysql::MySqlPool;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
|
use tokio::sync::oneshot;
|
||||||
|
|
||||||
use aquatic_http_protocol::{request::AnnounceRequest, response::FailureResponse};
|
use aquatic_http_protocol::{
|
||||||
|
request::AnnounceRequest,
|
||||||
|
response::{AnnounceResponse, FailureResponse, Response},
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::workers::common::ChannelAnnounceRequest;
|
||||||
|
|
||||||
use super::db;
|
use super::db;
|
||||||
|
|
||||||
|
|
@ -17,23 +25,42 @@ pub async fn announce(
|
||||||
opt_user_agent: Option<TypedHeader<UserAgent>>,
|
opt_user_agent: Option<TypedHeader<UserAgent>>,
|
||||||
Path(user_token): Path<String>,
|
Path(user_token): Path<String>,
|
||||||
RawQuery(query): RawQuery,
|
RawQuery(query): RawQuery,
|
||||||
) -> Result<String, (StatusCode, String)> {
|
) -> Result<(StatusCode, impl IntoResponse), (StatusCode, impl IntoResponse)> {
|
||||||
let request = AnnounceRequest::from_query_string(&query.unwrap_or_else(|| "".into()))
|
let request = AnnounceRequest::from_query_string(&query.unwrap_or_else(|| "".into()))
|
||||||
.map_err(anyhow_error)?;
|
.map_err(|err| build_response(Response::Failure(FailureResponse::new("Internal error"))))?;
|
||||||
|
|
||||||
let opt_user_agent = opt_user_agent.map(|header| header.as_str().to_owned());
|
let opt_user_agent = opt_user_agent.map(|header| header.as_str().to_owned());
|
||||||
|
|
||||||
let validated_request = db::validate_announce_request(&pool, peer_addr, opt_user_agent, user_token, request).await.map_err(failure_response)?;
|
let validated_request =
|
||||||
|
db::validate_announce_request(&pool, peer_addr, opt_user_agent, user_token, request)
|
||||||
|
.await
|
||||||
|
.map_err(|r| build_response(Response::Failure(r)))?;
|
||||||
|
|
||||||
// TODO: send request to request worker, await oneshot channel response
|
let (response_sender, response_receiver) = oneshot::channel();
|
||||||
|
|
||||||
Ok(format!("{:?}", validated_request))
|
let canonical_socket_addr = CanonicalSocketAddr::new(peer_addr);
|
||||||
|
|
||||||
|
let channel_request = ChannelAnnounceRequest {
|
||||||
|
request: validated_request,
|
||||||
|
source_addr: canonical_socket_addr,
|
||||||
|
response_sender,
|
||||||
|
};
|
||||||
|
|
||||||
|
// TODO: send request to request worker
|
||||||
|
|
||||||
|
let response = response_receiver.await.map_err(|err| {
|
||||||
|
::log::error!("channel response sender closed: {}", err);
|
||||||
|
|
||||||
|
build_response(Response::Failure(FailureResponse::new("Internal error")))
|
||||||
|
})?;
|
||||||
|
|
||||||
|
Ok(build_response(Response::Announce(response)))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn anyhow_error(err: anyhow::Error) -> (StatusCode, String) {
|
fn build_response(response: Response) -> (StatusCode, impl IntoResponse) {
|
||||||
(StatusCode::INTERNAL_SERVER_ERROR, err.to_string())
|
let mut response_bytes = Vec::with_capacity(512);
|
||||||
}
|
|
||||||
|
|
||||||
fn failure_response(response: FailureResponse) -> (StatusCode, String) {
|
response.write(&mut response_bytes);
|
||||||
(StatusCode::OK, format!("{:?}", response))
|
|
||||||
|
(StatusCode::OK, response_bytes)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue