mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-03-31 17:55:36 +00:00
work on http_private, including parsing announce requests
This commit is contained in:
parent
87223f7952
commit
dc943674f6
7 changed files with 69 additions and 33 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
|
@ -5,3 +5,4 @@
|
||||||
**/criterion/*/new
|
**/criterion/*/new
|
||||||
|
|
||||||
.DS_Store
|
.DS_Store
|
||||||
|
.env
|
||||||
2
Cargo.lock
generated
2
Cargo.lock
generated
|
|
@ -144,8 +144,10 @@ name = "aquatic_http_private"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
|
"aquatic_http_protocol",
|
||||||
"axum",
|
"axum",
|
||||||
"dotenv",
|
"dotenv",
|
||||||
|
"hex",
|
||||||
"socket2 0.4.4",
|
"socket2 0.4.4",
|
||||||
"sqlx",
|
"sqlx",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
|
|
||||||
|
|
@ -4,9 +4,12 @@ version = "0.1.0"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
aquatic_http_protocol = "0.2.0"
|
||||||
|
|
||||||
anyhow = "1"
|
anyhow = "1"
|
||||||
axum = { version = "0.5", default-features = false, features = ["headers", "http1", "matched-path", "original-uri"] }
|
axum = { version = "0.5", default-features = false, features = ["headers", "http1", "matched-path", "original-uri"] }
|
||||||
dotenv = "0.15"
|
dotenv = "0.15"
|
||||||
|
hex = "0.4"
|
||||||
socket2 = { version = "0.4", features = ["all"] }
|
socket2 = { version = "0.4", features = ["all"] }
|
||||||
sqlx = { version = "0.5", features = [ "runtime-tokio-rustls" , "mysql" ] }
|
sqlx = { version = "0.5", features = [ "runtime-tokio-rustls" , "mysql" ] }
|
||||||
tokio = { version = "1", features = ["full"] }
|
tokio = { version = "1", features = ["full"] }
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,7 @@ Create stored procedure (`OR REPLACE` keeps privileges in place and is supported
|
||||||
```sql
|
```sql
|
||||||
CREATE OR REPLACE PROCEDURE aquatic_announce_v1 (
|
CREATE OR REPLACE PROCEDURE aquatic_announce_v1 (
|
||||||
IN p_source_ip VARBINARY(16),
|
IN p_source_ip VARBINARY(16),
|
||||||
IN p_source_port SMALLINT,
|
IN p_source_port SMALLINT UNSIGNED,
|
||||||
IN p_user_agent TEXT,
|
IN p_user_agent TEXT,
|
||||||
IN p_user_token VARCHAR(255),
|
IN p_user_token VARCHAR(255),
|
||||||
IN p_info_hash CHAR(40),
|
IN p_info_hash CHAR(40),
|
||||||
|
|
@ -39,3 +39,11 @@ Create `.env` file:
|
||||||
```sh
|
```sh
|
||||||
DATABASE_URL="mysql://aquatic:aquatic@localhost/aquatic"
|
DATABASE_URL="mysql://aquatic:aquatic@localhost/aquatic"
|
||||||
```
|
```
|
||||||
|
|
||||||
|
Run application:
|
||||||
|
|
||||||
|
```sh
|
||||||
|
cargo run -p aquatic_http_private
|
||||||
|
```
|
||||||
|
|
||||||
|
Test by visiting `localhost:3000/abcd/announce/?info_hash=abcdeabcdeabcdeabcde&peer_id=abcdeabcdeabcdeabcde&port=1000&left=0`
|
||||||
|
|
@ -1,7 +1,9 @@
|
||||||
use std::net::{IpAddr, Ipv4Addr};
|
use std::net::{IpAddr, SocketAddr};
|
||||||
|
|
||||||
|
use aquatic_http_protocol::{common::AnnounceEvent, request::AnnounceRequest};
|
||||||
use sqlx::{Executor, MySql, Pool};
|
use sqlx::{Executor, MySql, Pool};
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
pub struct DbAnnounceRequest {
|
pub struct DbAnnounceRequest {
|
||||||
source_ip: IpAddr,
|
source_ip: IpAddr,
|
||||||
source_port: u16,
|
source_port: u16,
|
||||||
|
|
@ -9,11 +11,32 @@ pub struct DbAnnounceRequest {
|
||||||
user_token: String,
|
user_token: String,
|
||||||
info_hash: String,
|
info_hash: String,
|
||||||
peer_id: String,
|
peer_id: String,
|
||||||
event: String,
|
event: AnnounceEvent,
|
||||||
uploaded: u64,
|
uploaded: u64,
|
||||||
downloaded: u64,
|
downloaded: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl DbAnnounceRequest {
|
||||||
|
pub fn new(
|
||||||
|
source_addr: SocketAddr,
|
||||||
|
user_agent: Option<String>,
|
||||||
|
user_token: String, // FIXME: length
|
||||||
|
request: AnnounceRequest,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
source_ip: source_addr.ip(),
|
||||||
|
source_port: source_addr.port(),
|
||||||
|
user_agent,
|
||||||
|
user_token,
|
||||||
|
info_hash: hex::encode(request.info_hash.0),
|
||||||
|
peer_id: hex::encode(request.peer_id.0),
|
||||||
|
event: request.event,
|
||||||
|
uploaded: 0, // FIXME
|
||||||
|
downloaded: 0, // FIXME
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, sqlx::FromRow)]
|
#[derive(Debug, sqlx::FromRow)]
|
||||||
pub struct DbAnnounceResponse {
|
pub struct DbAnnounceResponse {
|
||||||
pub announce_allowed: bool,
|
pub announce_allowed: bool,
|
||||||
|
|
@ -21,28 +44,10 @@ pub struct DbAnnounceResponse {
|
||||||
pub warning_message: Option<String>,
|
pub warning_message: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn announce(pool: &Pool<MySql>) -> Result<DbAnnounceResponse, sqlx::Error> {
|
pub async fn get_announce_response(
|
||||||
let request = DbAnnounceRequest {
|
|
||||||
source_ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
|
|
||||||
source_port: 1000,
|
|
||||||
user_agent: Some("rtorrent".into()),
|
|
||||||
user_token: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".into(),
|
|
||||||
info_hash: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".into(),
|
|
||||||
peer_id: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".into(),
|
|
||||||
event: "started".into(),
|
|
||||||
uploaded: 50,
|
|
||||||
downloaded: 100,
|
|
||||||
};
|
|
||||||
|
|
||||||
let announce_response = get_announce_response(&pool, request).await?;
|
|
||||||
|
|
||||||
Ok(announce_response)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn get_announce_response(
|
|
||||||
pool: &Pool<MySql>,
|
pool: &Pool<MySql>,
|
||||||
request: DbAnnounceRequest,
|
request: DbAnnounceRequest,
|
||||||
) -> Result<DbAnnounceResponse, sqlx::Error> {
|
) -> anyhow::Result<DbAnnounceResponse> {
|
||||||
let source_ip_bytes: Vec<u8> = match request.source_ip {
|
let source_ip_bytes: Vec<u8> = match request.source_ip {
|
||||||
IpAddr::V4(ip) => ip.octets().into(),
|
IpAddr::V4(ip) => ip.octets().into(),
|
||||||
IpAddr::V6(ip) => ip.octets().into(),
|
IpAddr::V6(ip) => ip.octets().into(),
|
||||||
|
|
@ -78,7 +83,7 @@ async fn get_announce_response(
|
||||||
.bind(request.user_token)
|
.bind(request.user_token)
|
||||||
.bind(request.info_hash)
|
.bind(request.info_hash)
|
||||||
.bind(request.peer_id)
|
.bind(request.peer_id)
|
||||||
.bind(request.event)
|
.bind(request.event.as_str())
|
||||||
.bind(request.uploaded)
|
.bind(request.uploaded)
|
||||||
.bind(request.downloaded);
|
.bind(request.downloaded);
|
||||||
|
|
||||||
|
|
@ -94,9 +99,9 @@ async fn get_announce_response(
|
||||||
",
|
",
|
||||||
)
|
)
|
||||||
.fetch_one(&mut t)
|
.fetch_one(&mut t)
|
||||||
.await;
|
.await?;
|
||||||
|
|
||||||
t.commit().await?;
|
t.commit().await?;
|
||||||
|
|
||||||
response
|
Ok(response)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -22,7 +22,7 @@ pub fn run_socket_worker() -> anyhow::Result<()> {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run_app(tcp_listener: TcpListener) -> anyhow::Result<()> {
|
async fn run_app(tcp_listener: TcpListener) -> anyhow::Result<()> {
|
||||||
let db_url = ::std::env::var("DATABASE_URL").unwrap();
|
let db_url = ::std::env::var("DATABASE_URL").expect("env var DATABASE_URL");
|
||||||
|
|
||||||
let pool = MySqlPoolOptions::new()
|
let pool = MySqlPoolOptions::new()
|
||||||
.max_connections(5)
|
.max_connections(5)
|
||||||
|
|
@ -34,7 +34,7 @@ async fn run_app(tcp_listener: TcpListener) -> anyhow::Result<()> {
|
||||||
.layer(Extension(pool));
|
.layer(Extension(pool));
|
||||||
|
|
||||||
axum::Server::from_tcp(tcp_listener)?
|
axum::Server::from_tcp(tcp_listener)?
|
||||||
.serve(app.into_make_service())
|
.serve(app.into_make_service_with_connect_info::<SocketAddr>())
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
|
||||||
|
|
@ -1,21 +1,38 @@
|
||||||
use axum::{
|
use axum::{
|
||||||
extract::{Path, RawQuery},
|
extract::{ConnectInfo, Path, RawQuery},
|
||||||
headers::UserAgent,
|
headers::UserAgent,
|
||||||
http::StatusCode,
|
http::StatusCode,
|
||||||
Extension, TypedHeader,
|
Extension, TypedHeader,
|
||||||
};
|
};
|
||||||
use sqlx::mysql::MySqlPool;
|
use sqlx::mysql::MySqlPool;
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
|
||||||
|
use aquatic_http_protocol::request::AnnounceRequest;
|
||||||
|
|
||||||
use super::db;
|
use super::db;
|
||||||
|
|
||||||
pub async fn announce(
|
pub async fn announce(
|
||||||
Extension(pool): Extension<MySqlPool>,
|
Extension(pool): Extension<MySqlPool>,
|
||||||
|
ConnectInfo(peer_addr): ConnectInfo<SocketAddr>,
|
||||||
opt_user_agent: Option<TypedHeader<UserAgent>>,
|
opt_user_agent: Option<TypedHeader<UserAgent>>,
|
||||||
Path(user_token): Path<String>,
|
Path(user_token): Path<String>,
|
||||||
RawQuery(query): RawQuery,
|
RawQuery(query): RawQuery,
|
||||||
) -> Result<String, (StatusCode, String)> {
|
) -> Result<String, (StatusCode, String)> {
|
||||||
match db::announce(&pool).await {
|
let request = AnnounceRequest::from_query_string(&query.unwrap_or_else(|| "".into()))
|
||||||
Ok(r) => Ok(format!("{:?}", r)),
|
.map_err(anyhow_error)?;
|
||||||
Err(err) => Err((StatusCode::INTERNAL_SERVER_ERROR, err.to_string())),
|
|
||||||
}
|
let opt_user_agent = opt_user_agent.map(|header| header.as_str().to_owned());
|
||||||
|
|
||||||
|
let db_announce_request =
|
||||||
|
db::DbAnnounceRequest::new(peer_addr, opt_user_agent, user_token, request);
|
||||||
|
|
||||||
|
let db_announce_result = db::get_announce_response(&pool, db_announce_request)
|
||||||
|
.await
|
||||||
|
.map_err(anyhow_error)?;
|
||||||
|
|
||||||
|
Ok(format!("{:?}", db_announce_result))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn anyhow_error(err: anyhow::Error) -> (StatusCode, String) {
|
||||||
|
(StatusCode::INTERNAL_SERVER_ERROR, err.to_string())
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue