mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-03-31 17:55:36 +00:00
WIP: http_private: work on channel types
This commit is contained in:
parent
d4a89ee808
commit
0fb572556f
7 changed files with 85 additions and 31 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
|
@ -151,6 +151,7 @@ dependencies = [
|
||||||
"axum",
|
"axum",
|
||||||
"dotenv",
|
"dotenv",
|
||||||
"hex",
|
"hex",
|
||||||
|
"log",
|
||||||
"mimalloc",
|
"mimalloc",
|
||||||
"serde",
|
"serde",
|
||||||
"socket2 0.4.4",
|
"socket2 0.4.4",
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,7 @@ anyhow = "1"
|
||||||
axum = { version = "0.5", default-features = false, features = ["headers", "http1", "matched-path", "original-uri"] }
|
axum = { version = "0.5", default-features = false, features = ["headers", "http1", "matched-path", "original-uri"] }
|
||||||
dotenv = "0.15"
|
dotenv = "0.15"
|
||||||
hex = "0.4"
|
hex = "0.4"
|
||||||
|
log = "0.4"
|
||||||
mimalloc = { version = "0.1", default-features = false }
|
mimalloc = { version = "0.1", default-features = false }
|
||||||
serde = { version = "1", features = ["derive"] }
|
serde = { version = "1", features = ["derive"] }
|
||||||
socket2 = { version = "0.4", features = ["all"] }
|
socket2 = { version = "0.4", features = ["all"] }
|
||||||
|
|
|
||||||
11
aquatic_http_private/src/workers/common.rs
Normal file
11
aquatic_http_private/src/workers/common.rs
Normal file
|
|
@ -0,0 +1,11 @@
|
||||||
|
use aquatic_common::CanonicalSocketAddr;
|
||||||
|
use aquatic_http_protocol::response::AnnounceResponse;
|
||||||
|
use tokio::sync::oneshot::Sender;
|
||||||
|
|
||||||
|
use super::socket::db::ValidatedAnnounceRequest;
|
||||||
|
|
||||||
|
pub struct ChannelAnnounceRequest {
|
||||||
|
request: ValidatedAnnounceRequest,
|
||||||
|
source_addr: CanonicalSocketAddr,
|
||||||
|
response_sender: Sender<AnnounceResponse>,
|
||||||
|
}
|
||||||
|
|
@ -1,2 +1,3 @@
|
||||||
|
pub mod common;
|
||||||
pub mod request;
|
pub mod request;
|
||||||
pub mod socket;
|
pub mod socket;
|
||||||
|
|
|
||||||
|
|
@ -1,10 +1,19 @@
|
||||||
use std::net::{IpAddr, SocketAddr};
|
use std::net::{IpAddr, SocketAddr};
|
||||||
|
|
||||||
use aquatic_http_protocol::{common::AnnounceEvent, request::AnnounceRequest};
|
use aquatic_http_protocol::{common::AnnounceEvent, request::AnnounceRequest, response::FailureResponse};
|
||||||
use sqlx::{Executor, MySql, Pool};
|
use sqlx::{Executor, MySql, Pool};
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct DbAnnounceRequest {
|
pub struct ValidatedAnnounceRequest(AnnounceRequest);
|
||||||
|
|
||||||
|
impl Into<AnnounceRequest> for ValidatedAnnounceRequest {
|
||||||
|
fn into(self) -> AnnounceRequest {
|
||||||
|
self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct AnnounceProcedureParameters {
|
||||||
source_ip: IpAddr,
|
source_ip: IpAddr,
|
||||||
source_port: u16,
|
source_port: u16,
|
||||||
user_agent: Option<String>,
|
user_agent: Option<String>,
|
||||||
|
|
@ -17,12 +26,12 @@ pub struct DbAnnounceRequest {
|
||||||
left: u64,
|
left: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl DbAnnounceRequest {
|
impl AnnounceProcedureParameters {
|
||||||
pub fn new(
|
fn new(
|
||||||
source_addr: SocketAddr,
|
source_addr: SocketAddr,
|
||||||
user_agent: Option<String>,
|
user_agent: Option<String>,
|
||||||
user_token: String, // FIXME: length
|
user_token: String, // FIXME: length
|
||||||
request: AnnounceRequest,
|
request: &AnnounceRequest,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
source_ip: source_addr.ip(),
|
source_ip: source_addr.ip(),
|
||||||
|
|
@ -40,17 +49,47 @@ impl DbAnnounceRequest {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, sqlx::FromRow)]
|
#[derive(Debug, sqlx::FromRow)]
|
||||||
pub struct DbAnnounceResponse {
|
struct AnnounceProcedureResults {
|
||||||
pub announce_allowed: bool,
|
announce_allowed: bool,
|
||||||
pub failure_reason: Option<String>,
|
failure_reason: Option<String>,
|
||||||
pub warning_message: Option<String>,
|
warning_message: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_announce_response(
|
pub async fn validate_announce_request(
|
||||||
pool: &Pool<MySql>,
|
pool: &Pool<MySql>,
|
||||||
request: DbAnnounceRequest,
|
source_addr: SocketAddr,
|
||||||
) -> anyhow::Result<DbAnnounceResponse> {
|
user_agent: Option<String>,
|
||||||
let source_ip_bytes: Vec<u8> = match request.source_ip {
|
user_token: String,
|
||||||
|
request: AnnounceRequest,
|
||||||
|
) -> Result<ValidatedAnnounceRequest, FailureResponse> {
|
||||||
|
let parameters = AnnounceProcedureParameters::new(
|
||||||
|
source_addr,
|
||||||
|
user_agent,
|
||||||
|
user_token,
|
||||||
|
&request,
|
||||||
|
);
|
||||||
|
|
||||||
|
match call_announce_procedure(pool, parameters).await {
|
||||||
|
Ok(results) => {
|
||||||
|
if results.announce_allowed {
|
||||||
|
Ok(ValidatedAnnounceRequest(request))
|
||||||
|
} else {
|
||||||
|
Err(FailureResponse::new(results.failure_reason.unwrap_or_else(|| "Not allowed".into())))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
::log::error!("announce procedure error: {:#}", err);
|
||||||
|
|
||||||
|
Err(FailureResponse::new("Internal error"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn call_announce_procedure(
|
||||||
|
pool: &Pool<MySql>,
|
||||||
|
parameters: AnnounceProcedureParameters,
|
||||||
|
) -> anyhow::Result<AnnounceProcedureResults> {
|
||||||
|
let source_ip_bytes: Vec<u8> = match parameters.source_ip {
|
||||||
IpAddr::V4(ip) => ip.octets().into(),
|
IpAddr::V4(ip) => ip.octets().into(),
|
||||||
IpAddr::V6(ip) => ip.octets().into(),
|
IpAddr::V6(ip) => ip.octets().into(),
|
||||||
};
|
};
|
||||||
|
|
@ -81,19 +120,19 @@ pub async fn get_announce_response(
|
||||||
",
|
",
|
||||||
)
|
)
|
||||||
.bind(source_ip_bytes)
|
.bind(source_ip_bytes)
|
||||||
.bind(request.source_port)
|
.bind(parameters.source_port)
|
||||||
.bind(request.user_agent)
|
.bind(parameters.user_agent)
|
||||||
.bind(request.user_token)
|
.bind(parameters.user_token)
|
||||||
.bind(request.info_hash)
|
.bind(parameters.info_hash)
|
||||||
.bind(request.peer_id)
|
.bind(parameters.peer_id)
|
||||||
.bind(request.event.as_str())
|
.bind(parameters.event.as_str())
|
||||||
.bind(request.uploaded)
|
.bind(parameters.uploaded)
|
||||||
.bind(request.downloaded)
|
.bind(parameters.downloaded)
|
||||||
.bind(request.left);
|
.bind(parameters.left);
|
||||||
|
|
||||||
t.execute(q).await?;
|
t.execute(q).await?;
|
||||||
|
|
||||||
let response = sqlx::query_as::<_, DbAnnounceResponse>(
|
let response = sqlx::query_as::<_, AnnounceProcedureResults>(
|
||||||
"
|
"
|
||||||
SELECT
|
SELECT
|
||||||
@p_announce_allowed as announce_allowed,
|
@p_announce_allowed as announce_allowed,
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
mod db;
|
pub mod db;
|
||||||
mod routes;
|
mod routes;
|
||||||
|
|
||||||
use std::net::{SocketAddr, TcpListener};
|
use std::net::{SocketAddr, TcpListener};
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,7 @@ use axum::{
|
||||||
use sqlx::mysql::MySqlPool;
|
use sqlx::mysql::MySqlPool;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
|
|
||||||
use aquatic_http_protocol::request::AnnounceRequest;
|
use aquatic_http_protocol::{request::AnnounceRequest, response::FailureResponse};
|
||||||
|
|
||||||
use super::db;
|
use super::db;
|
||||||
|
|
||||||
|
|
@ -23,16 +23,17 @@ pub async fn announce(
|
||||||
|
|
||||||
let opt_user_agent = opt_user_agent.map(|header| header.as_str().to_owned());
|
let opt_user_agent = opt_user_agent.map(|header| header.as_str().to_owned());
|
||||||
|
|
||||||
let db_announce_request =
|
let validated_request = db::validate_announce_request(&pool, peer_addr, opt_user_agent, user_token, request).await.map_err(failure_response)?;
|
||||||
db::DbAnnounceRequest::new(peer_addr, opt_user_agent, user_token, request);
|
|
||||||
|
|
||||||
let db_announce_result = db::get_announce_response(&pool, db_announce_request)
|
// TODO: send request to request worker, await oneshot channel response
|
||||||
.await
|
|
||||||
.map_err(anyhow_error)?;
|
|
||||||
|
|
||||||
Ok(format!("{:?}", db_announce_result))
|
Ok(format!("{:?}", validated_request))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn anyhow_error(err: anyhow::Error) -> (StatusCode, String) {
|
fn anyhow_error(err: anyhow::Error) -> (StatusCode, String) {
|
||||||
(StatusCode::INTERNAL_SERVER_ERROR, err.to_string())
|
(StatusCode::INTERNAL_SERVER_ERROR, err.to_string())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn failure_response(response: FailureResponse) -> (StatusCode, String) {
|
||||||
|
(StatusCode::OK, format!("{:?}", response))
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue