From 0fb572556fc6878fc47d6b6f9fd329cfb7fee69c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 2 Apr 2022 15:43:44 +0200 Subject: [PATCH] WIP: http_private: work on channel types --- Cargo.lock | 1 + aquatic_http_private/Cargo.toml | 1 + aquatic_http_private/src/workers/common.rs | 11 +++ aquatic_http_private/src/workers/mod.rs | 1 + aquatic_http_private/src/workers/socket/db.rs | 85 ++++++++++++++----- .../src/workers/socket/mod.rs | 2 +- .../src/workers/socket/routes.rs | 15 ++-- 7 files changed, 85 insertions(+), 31 deletions(-) create mode 100644 aquatic_http_private/src/workers/common.rs diff --git a/Cargo.lock b/Cargo.lock index 3326978..5df294d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -151,6 +151,7 @@ dependencies = [ "axum", "dotenv", "hex", + "log", "mimalloc", "serde", "socket2 0.4.4", diff --git a/aquatic_http_private/Cargo.toml b/aquatic_http_private/Cargo.toml index a2665fd..272fed8 100644 --- a/aquatic_http_private/Cargo.toml +++ b/aquatic_http_private/Cargo.toml @@ -19,6 +19,7 @@ anyhow = "1" axum = { version = "0.5", default-features = false, features = ["headers", "http1", "matched-path", "original-uri"] } dotenv = "0.15" hex = "0.4" +log = "0.4" mimalloc = { version = "0.1", default-features = false } serde = { version = "1", features = ["derive"] } socket2 = { version = "0.4", features = ["all"] } diff --git a/aquatic_http_private/src/workers/common.rs b/aquatic_http_private/src/workers/common.rs new file mode 100644 index 0000000..58db3cd --- /dev/null +++ b/aquatic_http_private/src/workers/common.rs @@ -0,0 +1,11 @@ +use aquatic_common::CanonicalSocketAddr; +use aquatic_http_protocol::response::AnnounceResponse; +use tokio::sync::oneshot::Sender; + +use super::socket::db::ValidatedAnnounceRequest; + +pub struct ChannelAnnounceRequest { + request: ValidatedAnnounceRequest, + source_addr: CanonicalSocketAddr, + response_sender: Sender, +} \ No newline at end of file diff --git a/aquatic_http_private/src/workers/mod.rs b/aquatic_http_private/src/workers/mod.rs index 63fc0ec..978429f 100644 --- a/aquatic_http_private/src/workers/mod.rs +++ b/aquatic_http_private/src/workers/mod.rs @@ -1,2 +1,3 @@ +pub mod common; pub mod request; pub mod socket; diff --git a/aquatic_http_private/src/workers/socket/db.rs b/aquatic_http_private/src/workers/socket/db.rs index 74039cb..4c4f630 100644 --- a/aquatic_http_private/src/workers/socket/db.rs +++ b/aquatic_http_private/src/workers/socket/db.rs @@ -1,10 +1,19 @@ use std::net::{IpAddr, SocketAddr}; -use aquatic_http_protocol::{common::AnnounceEvent, request::AnnounceRequest}; +use aquatic_http_protocol::{common::AnnounceEvent, request::AnnounceRequest, response::FailureResponse}; use sqlx::{Executor, MySql, Pool}; #[derive(Debug)] -pub struct DbAnnounceRequest { +pub struct ValidatedAnnounceRequest(AnnounceRequest); + +impl Into for ValidatedAnnounceRequest { + fn into(self) -> AnnounceRequest { + self.0 + } +} + +#[derive(Debug)] +struct AnnounceProcedureParameters { source_ip: IpAddr, source_port: u16, user_agent: Option, @@ -17,12 +26,12 @@ pub struct DbAnnounceRequest { left: u64, } -impl DbAnnounceRequest { - pub fn new( +impl AnnounceProcedureParameters { + fn new( source_addr: SocketAddr, user_agent: Option, user_token: String, // FIXME: length - request: AnnounceRequest, + request: &AnnounceRequest, ) -> Self { Self { source_ip: source_addr.ip(), @@ -40,17 +49,47 @@ impl DbAnnounceRequest { } #[derive(Debug, sqlx::FromRow)] -pub struct DbAnnounceResponse { - pub announce_allowed: bool, - pub failure_reason: Option, - pub warning_message: Option, +struct AnnounceProcedureResults { + announce_allowed: bool, + failure_reason: Option, + warning_message: Option, } -pub async fn get_announce_response( +pub async fn validate_announce_request( pool: &Pool, - request: DbAnnounceRequest, -) -> anyhow::Result { - let source_ip_bytes: Vec = match request.source_ip { + source_addr: SocketAddr, + user_agent: Option, + user_token: String, + request: AnnounceRequest, +) -> Result { + let parameters = AnnounceProcedureParameters::new( + source_addr, + user_agent, + user_token, + &request, + ); + + match call_announce_procedure(pool, parameters).await { + Ok(results) => { + if results.announce_allowed { + Ok(ValidatedAnnounceRequest(request)) + } else { + Err(FailureResponse::new(results.failure_reason.unwrap_or_else(|| "Not allowed".into()))) + } + } + Err(err) => { + ::log::error!("announce procedure error: {:#}", err); + + Err(FailureResponse::new("Internal error")) + } + } +} + +async fn call_announce_procedure( + pool: &Pool, + parameters: AnnounceProcedureParameters, +) -> anyhow::Result { + let source_ip_bytes: Vec = match parameters.source_ip { IpAddr::V4(ip) => ip.octets().into(), IpAddr::V6(ip) => ip.octets().into(), }; @@ -81,19 +120,19 @@ pub async fn get_announce_response( ", ) .bind(source_ip_bytes) - .bind(request.source_port) - .bind(request.user_agent) - .bind(request.user_token) - .bind(request.info_hash) - .bind(request.peer_id) - .bind(request.event.as_str()) - .bind(request.uploaded) - .bind(request.downloaded) - .bind(request.left); + .bind(parameters.source_port) + .bind(parameters.user_agent) + .bind(parameters.user_token) + .bind(parameters.info_hash) + .bind(parameters.peer_id) + .bind(parameters.event.as_str()) + .bind(parameters.uploaded) + .bind(parameters.downloaded) + .bind(parameters.left); t.execute(q).await?; - let response = sqlx::query_as::<_, DbAnnounceResponse>( + let response = sqlx::query_as::<_, AnnounceProcedureResults>( " SELECT @p_announce_allowed as announce_allowed, diff --git a/aquatic_http_private/src/workers/socket/mod.rs b/aquatic_http_private/src/workers/socket/mod.rs index f986816..7d5d0cd 100644 --- a/aquatic_http_private/src/workers/socket/mod.rs +++ b/aquatic_http_private/src/workers/socket/mod.rs @@ -1,4 +1,4 @@ -mod db; +pub mod db; mod routes; use std::net::{SocketAddr, TcpListener}; diff --git a/aquatic_http_private/src/workers/socket/routes.rs b/aquatic_http_private/src/workers/socket/routes.rs index 7652eaf..8e49872 100644 --- a/aquatic_http_private/src/workers/socket/routes.rs +++ b/aquatic_http_private/src/workers/socket/routes.rs @@ -7,7 +7,7 @@ use axum::{ use sqlx::mysql::MySqlPool; use std::net::SocketAddr; -use aquatic_http_protocol::request::AnnounceRequest; +use aquatic_http_protocol::{request::AnnounceRequest, response::FailureResponse}; use super::db; @@ -23,16 +23,17 @@ pub async fn announce( let opt_user_agent = opt_user_agent.map(|header| header.as_str().to_owned()); - let db_announce_request = - db::DbAnnounceRequest::new(peer_addr, opt_user_agent, user_token, request); + let validated_request = db::validate_announce_request(&pool, peer_addr, opt_user_agent, user_token, request).await.map_err(failure_response)?; - let db_announce_result = db::get_announce_response(&pool, db_announce_request) - .await - .map_err(anyhow_error)?; + // TODO: send request to request worker, await oneshot channel response - Ok(format!("{:?}", db_announce_result)) + Ok(format!("{:?}", validated_request)) } fn anyhow_error(err: anyhow::Error) -> (StatusCode, String) { (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()) } + +fn failure_response(response: FailureResponse) -> (StatusCode, String) { + (StatusCode::OK, format!("{:?}", response)) +}