From dc943674f6d72971324c56a1915ee2c005e1fa17 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 2 Apr 2022 13:32:36 +0200 Subject: [PATCH] work on http_private, including parsing announce requests --- .gitignore | 1 + Cargo.lock | 2 + aquatic_http_private/Cargo.toml | 3 + aquatic_http_private/README.md | 10 +++- aquatic_http_private/src/workers/socket/db.rs | 55 ++++++++++--------- .../src/workers/socket/mod.rs | 4 +- .../src/workers/socket/routes.rs | 27 +++++++-- 7 files changed, 69 insertions(+), 33 deletions(-) diff --git a/.gitignore b/.gitignore index 3de5924..0446378 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ **/criterion/*/new .DS_Store +.env \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 3151194..63ee8fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -144,8 +144,10 @@ name = "aquatic_http_private" version = "0.1.0" dependencies = [ "anyhow", + "aquatic_http_protocol", "axum", "dotenv", + "hex", "socket2 0.4.4", "sqlx", "tokio", diff --git a/aquatic_http_private/Cargo.toml b/aquatic_http_private/Cargo.toml index 2a99b58..eeab0a5 100644 --- a/aquatic_http_private/Cargo.toml +++ b/aquatic_http_private/Cargo.toml @@ -4,9 +4,12 @@ version = "0.1.0" edition = "2021" [dependencies] +aquatic_http_protocol = "0.2.0" + anyhow = "1" axum = { version = "0.5", default-features = false, features = ["headers", "http1", "matched-path", "original-uri"] } dotenv = "0.15" +hex = "0.4" socket2 = { version = "0.4", features = ["all"] } sqlx = { version = "0.5", features = [ "runtime-tokio-rustls" , "mysql" ] } tokio = { version = "1", features = ["full"] } diff --git a/aquatic_http_private/README.md b/aquatic_http_private/README.md index 31419b6..57652d0 100644 --- a/aquatic_http_private/README.md +++ b/aquatic_http_private/README.md @@ -16,7 +16,7 @@ Create stored procedure (`OR REPLACE` keeps privileges in place and is supported ```sql CREATE OR REPLACE PROCEDURE aquatic_announce_v1 ( IN p_source_ip VARBINARY(16), - IN p_source_port SMALLINT, + IN p_source_port SMALLINT UNSIGNED, IN p_user_agent TEXT, IN p_user_token VARCHAR(255), IN p_info_hash CHAR(40), @@ -39,3 +39,11 @@ Create `.env` file: ```sh DATABASE_URL="mysql://aquatic:aquatic@localhost/aquatic" ``` + +Run application: + +```sh +cargo run -p aquatic_http_private +``` + +Test by visiting `localhost:3000/abcd/announce/?info_hash=abcdeabcdeabcdeabcde&peer_id=abcdeabcdeabcdeabcde&port=1000&left=0` \ No newline at end of file diff --git a/aquatic_http_private/src/workers/socket/db.rs b/aquatic_http_private/src/workers/socket/db.rs index c6f6f8d..c5c72e4 100644 --- a/aquatic_http_private/src/workers/socket/db.rs +++ b/aquatic_http_private/src/workers/socket/db.rs @@ -1,7 +1,9 @@ -use std::net::{IpAddr, Ipv4Addr}; +use std::net::{IpAddr, SocketAddr}; +use aquatic_http_protocol::{common::AnnounceEvent, request::AnnounceRequest}; use sqlx::{Executor, MySql, Pool}; +#[derive(Debug)] pub struct DbAnnounceRequest { source_ip: IpAddr, source_port: u16, @@ -9,11 +11,32 @@ pub struct DbAnnounceRequest { user_token: String, info_hash: String, peer_id: String, - event: String, + event: AnnounceEvent, uploaded: u64, downloaded: u64, } +impl DbAnnounceRequest { + pub fn new( + source_addr: SocketAddr, + user_agent: Option, + user_token: String, // FIXME: length + request: AnnounceRequest, + ) -> Self { + Self { + source_ip: source_addr.ip(), + source_port: source_addr.port(), + user_agent, + user_token, + info_hash: hex::encode(request.info_hash.0), + peer_id: hex::encode(request.peer_id.0), + event: request.event, + uploaded: 0, // FIXME + downloaded: 0, // FIXME + } + } +} + #[derive(Debug, sqlx::FromRow)] pub struct DbAnnounceResponse { pub announce_allowed: bool, @@ -21,28 +44,10 @@ pub struct DbAnnounceResponse { pub warning_message: Option, } -pub async fn announce(pool: &Pool) -> Result { - let request = DbAnnounceRequest { - source_ip: IpAddr::V4(Ipv4Addr::LOCALHOST), - source_port: 1000, - user_agent: Some("rtorrent".into()), - user_token: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".into(), - info_hash: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".into(), - peer_id: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".into(), - event: "started".into(), - uploaded: 50, - downloaded: 100, - }; - - let announce_response = get_announce_response(&pool, request).await?; - - Ok(announce_response) -} - -async fn get_announce_response( +pub async fn get_announce_response( pool: &Pool, request: DbAnnounceRequest, -) -> Result { +) -> anyhow::Result { let source_ip_bytes: Vec = match request.source_ip { IpAddr::V4(ip) => ip.octets().into(), IpAddr::V6(ip) => ip.octets().into(), @@ -78,7 +83,7 @@ async fn get_announce_response( .bind(request.user_token) .bind(request.info_hash) .bind(request.peer_id) - .bind(request.event) + .bind(request.event.as_str()) .bind(request.uploaded) .bind(request.downloaded); @@ -94,9 +99,9 @@ async fn get_announce_response( ", ) .fetch_one(&mut t) - .await; + .await?; t.commit().await?; - response + Ok(response) } diff --git a/aquatic_http_private/src/workers/socket/mod.rs b/aquatic_http_private/src/workers/socket/mod.rs index ccc28ed..696d5b9 100644 --- a/aquatic_http_private/src/workers/socket/mod.rs +++ b/aquatic_http_private/src/workers/socket/mod.rs @@ -22,7 +22,7 @@ pub fn run_socket_worker() -> anyhow::Result<()> { } async fn run_app(tcp_listener: TcpListener) -> anyhow::Result<()> { - let db_url = ::std::env::var("DATABASE_URL").unwrap(); + let db_url = ::std::env::var("DATABASE_URL").expect("env var DATABASE_URL"); let pool = MySqlPoolOptions::new() .max_connections(5) @@ -34,7 +34,7 @@ async fn run_app(tcp_listener: TcpListener) -> anyhow::Result<()> { .layer(Extension(pool)); axum::Server::from_tcp(tcp_listener)? - .serve(app.into_make_service()) + .serve(app.into_make_service_with_connect_info::()) .await?; Ok(()) diff --git a/aquatic_http_private/src/workers/socket/routes.rs b/aquatic_http_private/src/workers/socket/routes.rs index 24b1d43..7652eaf 100644 --- a/aquatic_http_private/src/workers/socket/routes.rs +++ b/aquatic_http_private/src/workers/socket/routes.rs @@ -1,21 +1,38 @@ use axum::{ - extract::{Path, RawQuery}, + extract::{ConnectInfo, Path, RawQuery}, headers::UserAgent, http::StatusCode, Extension, TypedHeader, }; use sqlx::mysql::MySqlPool; +use std::net::SocketAddr; + +use aquatic_http_protocol::request::AnnounceRequest; use super::db; pub async fn announce( Extension(pool): Extension, + ConnectInfo(peer_addr): ConnectInfo, opt_user_agent: Option>, Path(user_token): Path, RawQuery(query): RawQuery, ) -> Result { - match db::announce(&pool).await { - Ok(r) => Ok(format!("{:?}", r)), - Err(err) => Err((StatusCode::INTERNAL_SERVER_ERROR, err.to_string())), - } + let request = AnnounceRequest::from_query_string(&query.unwrap_or_else(|| "".into())) + .map_err(anyhow_error)?; + + let opt_user_agent = opt_user_agent.map(|header| header.as_str().to_owned()); + + let db_announce_request = + db::DbAnnounceRequest::new(peer_addr, opt_user_agent, user_token, request); + + let db_announce_result = db::get_announce_response(&pool, db_announce_request) + .await + .map_err(anyhow_error)?; + + Ok(format!("{:?}", db_announce_result)) +} + +fn anyhow_error(err: anyhow::Error) -> (StatusCode, String) { + (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()) }