diff --git a/aquatic_http_private/src/workers/common.rs b/aquatic_http_private/src/workers/common.rs index 58db3cd..ae73ace 100644 --- a/aquatic_http_private/src/workers/common.rs +++ b/aquatic_http_private/src/workers/common.rs @@ -5,7 +5,7 @@ use tokio::sync::oneshot::Sender; use super::socket::db::ValidatedAnnounceRequest; pub struct ChannelAnnounceRequest { - request: ValidatedAnnounceRequest, - source_addr: CanonicalSocketAddr, - response_sender: Sender, -} \ No newline at end of file + pub request: ValidatedAnnounceRequest, + pub source_addr: CanonicalSocketAddr, + pub response_sender: Sender, +} diff --git a/aquatic_http_private/src/workers/socket/db.rs b/aquatic_http_private/src/workers/socket/db.rs index 4c4f630..b2b094e 100644 --- a/aquatic_http_private/src/workers/socket/db.rs +++ b/aquatic_http_private/src/workers/socket/db.rs @@ -1,6 +1,8 @@ use std::net::{IpAddr, SocketAddr}; -use aquatic_http_protocol::{common::AnnounceEvent, request::AnnounceRequest, response::FailureResponse}; +use aquatic_http_protocol::{ + common::AnnounceEvent, request::AnnounceRequest, response::FailureResponse, +}; use sqlx::{Executor, MySql, Pool}; #[derive(Debug)] @@ -62,19 +64,19 @@ pub async fn validate_announce_request( user_token: String, request: AnnounceRequest, ) -> Result { - let parameters = AnnounceProcedureParameters::new( - source_addr, - user_agent, - user_token, - &request, - ); + let parameters = + AnnounceProcedureParameters::new(source_addr, user_agent, user_token, &request); match call_announce_procedure(pool, parameters).await { Ok(results) => { if results.announce_allowed { Ok(ValidatedAnnounceRequest(request)) } else { - Err(FailureResponse::new(results.failure_reason.unwrap_or_else(|| "Not allowed".into()))) + Err(FailureResponse::new( + results + .failure_reason + .unwrap_or_else(|| "Not allowed".into()), + )) } } Err(err) => { diff --git a/aquatic_http_private/src/workers/socket/routes.rs b/aquatic_http_private/src/workers/socket/routes.rs index 8e49872..a18c23c 100644 --- a/aquatic_http_private/src/workers/socket/routes.rs +++ b/aquatic_http_private/src/workers/socket/routes.rs @@ -1,13 +1,21 @@ +use aquatic_common::CanonicalSocketAddr; use axum::{ extract::{ConnectInfo, Path, RawQuery}, headers::UserAgent, http::StatusCode, + response::IntoResponse, Extension, TypedHeader, }; use sqlx::mysql::MySqlPool; use std::net::SocketAddr; +use tokio::sync::oneshot; -use aquatic_http_protocol::{request::AnnounceRequest, response::FailureResponse}; +use aquatic_http_protocol::{ + request::AnnounceRequest, + response::{AnnounceResponse, FailureResponse, Response}, +}; + +use crate::workers::common::ChannelAnnounceRequest; use super::db; @@ -17,23 +25,42 @@ pub async fn announce( opt_user_agent: Option>, Path(user_token): Path, RawQuery(query): RawQuery, -) -> Result { +) -> Result<(StatusCode, impl IntoResponse), (StatusCode, impl IntoResponse)> { let request = AnnounceRequest::from_query_string(&query.unwrap_or_else(|| "".into())) - .map_err(anyhow_error)?; + .map_err(|err| build_response(Response::Failure(FailureResponse::new("Internal error"))))?; let opt_user_agent = opt_user_agent.map(|header| header.as_str().to_owned()); - let validated_request = db::validate_announce_request(&pool, peer_addr, opt_user_agent, user_token, request).await.map_err(failure_response)?; + let validated_request = + db::validate_announce_request(&pool, peer_addr, opt_user_agent, user_token, request) + .await + .map_err(|r| build_response(Response::Failure(r)))?; - // TODO: send request to request worker, await oneshot channel response + let (response_sender, response_receiver) = oneshot::channel(); - Ok(format!("{:?}", validated_request)) + let canonical_socket_addr = CanonicalSocketAddr::new(peer_addr); + + let channel_request = ChannelAnnounceRequest { + request: validated_request, + source_addr: canonical_socket_addr, + response_sender, + }; + + // TODO: send request to request worker + + let response = response_receiver.await.map_err(|err| { + ::log::error!("channel response sender closed: {}", err); + + build_response(Response::Failure(FailureResponse::new("Internal error"))) + })?; + + Ok(build_response(Response::Announce(response))) } -fn anyhow_error(err: anyhow::Error) -> (StatusCode, String) { - (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()) -} +fn build_response(response: Response) -> (StatusCode, impl IntoResponse) { + let mut response_bytes = Vec::with_capacity(512); -fn failure_response(response: FailureResponse) -> (StatusCode, String) { - (StatusCode::OK, format!("{:?}", response)) + response.write(&mut response_bytes); + + (StatusCode::OK, response_bytes) }