WIP: start work on aquatic_http_private

This commit is contained in:
Joakim Frostegård 2022-04-02 12:27:48 +02:00
parent f0a662e474
commit 11829f98eb
11 changed files with 2821 additions and 22 deletions

904
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -6,6 +6,7 @@ members = [
"aquatic_common", "aquatic_common",
"aquatic_http", "aquatic_http",
"aquatic_http_load_test", "aquatic_http_load_test",
"aquatic_http_private",
"aquatic_http_protocol", "aquatic_http_protocol",
"aquatic_toml_config", "aquatic_toml_config",
"aquatic_toml_config_derive", "aquatic_toml_config_derive",
@ -24,6 +25,7 @@ aquatic_cli_helpers = { path = "aquatic_cli_helpers" }
aquatic_common = { path = "aquatic_common" } aquatic_common = { path = "aquatic_common" }
aquatic_http_load_test = { path = "aquatic_http_load_test" } aquatic_http_load_test = { path = "aquatic_http_load_test" }
aquatic_http = { path = "aquatic_http" } aquatic_http = { path = "aquatic_http" }
aquatic_http_private = { path = "aquatic_http_private" }
aquatic_http_protocol = { path = "aquatic_http_protocol" } aquatic_http_protocol = { path = "aquatic_http_protocol" }
aquatic_toml_config_derive = { path = "aquatic_toml_config_derive" } aquatic_toml_config_derive = { path = "aquatic_toml_config_derive" }
aquatic_toml_config = { path = "aquatic_toml_config" } aquatic_toml_config = { path = "aquatic_toml_config" }

2
aquatic_http_private/.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
/target
.env

1664
aquatic_http_private/Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,13 @@
[package]
name = "aquatic_http_private"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1"
axum = { version = "0.5", default-features = false, features = ["headers", "http1", "matched-path", "original-uri"] }
dotenv = "0.15"
socket2 = { version = "0.4", features = ["all"] }
sqlx = { version = "0.5", features = [ "runtime-tokio-rustls" , "mysql" ] }
tokio = { version = "1", features = ["full"] }

View file

@ -0,0 +1,41 @@
# aquatic_private
## Setup
Create user:
```sql
CREATE DATABASE aquatic;
CREATE USER 'aquatic'@localhost IDENTIFIED BY 'aquatic';
GRANT EXECUTE ON PROCEDURE aquatic.aquatic_announce_v1 TO 'aquatic'@localhost;
FLUSH PRIVILEGES;
```
Create stored procedure (`OR REPLACE` keeps privileges in place and is supported by MariaDB since 10.1.3):
```sql
CREATE OR REPLACE PROCEDURE aquatic_announce_v1 (
IN p_source_ip VARBINARY(16),
IN p_source_port SMALLINT,
IN p_user_agent TEXT,
IN p_user_token VARCHAR(255),
IN p_info_hash CHAR(40),
IN p_peer_id CHAR(40),
IN p_event VARCHAR(9),
IN p_uploaded BIGINT,
IN p_downloaded BIGINT,
OUT p_announce_allowed BOOLEAN,
OUT p_failure_reason TEXT,
OUT p_warning_message TEXT
)
MODIFIES SQL DATA
BEGIN
SELECT true INTO p_announce_allowed;
END
```
Create `.env` file:
```sh
DATABASE_URL="mysql://aquatic:aquatic@localhost/aquatic"
```

View file

@ -0,0 +1,24 @@
mod workers;
use dotenv::dotenv;
fn main() -> anyhow::Result<()> {
dotenv().ok();
let mut handles = Vec::new();
for _ in 0..2 {
let handle = ::std::thread::Builder::new()
.name("socket".into())
.spawn(move || workers::socket::run_socket_worker())?;
handles.push(handle);
}
for handle in handles {
handle
.join()
.map_err(|err| anyhow::anyhow!("thread join error: {:?}", err))??;
}
Ok(())
}

View file

@ -0,0 +1 @@
pub mod socket;

View file

@ -0,0 +1,102 @@
use std::net::{IpAddr, Ipv4Addr};
use sqlx::{Executor, MySql, Pool};
pub struct DbAnnounceRequest {
source_ip: IpAddr,
source_port: u16,
user_agent: Option<String>,
user_token: String,
info_hash: String,
peer_id: String,
event: String,
uploaded: u64,
downloaded: u64,
}
#[derive(Debug, sqlx::FromRow)]
pub struct DbAnnounceResponse {
pub announce_allowed: bool,
pub failure_reason: Option<String>,
pub warning_message: Option<String>,
}
pub async fn announce(pool: &Pool<MySql>) -> Result<DbAnnounceResponse, sqlx::Error> {
let request = DbAnnounceRequest {
source_ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
source_port: 1000,
user_agent: Some("rtorrent".into()),
user_token: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".into(),
info_hash: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".into(),
peer_id: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".into(),
event: "started".into(),
uploaded: 50,
downloaded: 100,
};
let announce_response = get_announce_response(&pool, request).await?;
Ok(announce_response)
}
async fn get_announce_response(
pool: &Pool<MySql>,
request: DbAnnounceRequest,
) -> Result<DbAnnounceResponse, sqlx::Error> {
let source_ip_bytes: Vec<u8> = match request.source_ip {
IpAddr::V4(ip) => ip.octets().into(),
IpAddr::V6(ip) => ip.octets().into(),
};
let mut t = pool.begin().await?;
t.execute("SET @p_announce_allowed = false;").await?;
t.execute("SET @p_failure_reason = NULL;").await?;
t.execute("SET @p_warning_message = NULL;").await?;
let q = sqlx::query(
"
CALL aquatic_announce_v1(
?,
?,
?,
?,
?,
?,
?,
?,
?,
@p_announce_allowed,
@p_failure_reason,
@p_warning_message
);
",
)
.bind(source_ip_bytes)
.bind(request.source_port)
.bind(request.user_agent)
.bind(request.user_token)
.bind(request.info_hash)
.bind(request.peer_id)
.bind(request.event)
.bind(request.uploaded)
.bind(request.downloaded);
t.execute(q).await?;
let response = sqlx::query_as::<_, DbAnnounceResponse>(
"
SELECT
@p_announce_allowed as announce_allowed,
@p_failure_reason as failure_reason,
@p_warning_message as warning_message;
",
)
.fetch_one(&mut t)
.await;
t.commit().await?;
response
}

View file

@ -0,0 +1,67 @@
mod db;
mod routes;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4, TcpListener};
use anyhow::Context;
use axum::{routing::get, Extension, Router};
use sqlx::mysql::MySqlPoolOptions;
pub fn run_socket_worker() -> anyhow::Result<()> {
let addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 3000));
let tcp_listener = create_tcp_listener(addr, false)?;
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
runtime.block_on(run_app(tcp_listener))?;
Ok(())
}
async fn run_app(tcp_listener: TcpListener) -> anyhow::Result<()> {
let db_url = ::std::env::var("DATABASE_URL").unwrap();
let pool = MySqlPoolOptions::new()
.max_connections(5)
.connect(&db_url)
.await?;
let app = Router::new()
.route("/:user_token/announce/", get(routes::announce))
.layer(Extension(pool));
axum::Server::from_tcp(tcp_listener)?
.serve(app.into_make_service())
.await?;
Ok(())
}
fn create_tcp_listener(addr: SocketAddr, only_ipv6: bool) -> anyhow::Result<TcpListener> {
let domain = if addr.is_ipv4() {
socket2::Domain::IPV4
} else {
socket2::Domain::IPV6
};
let socket = socket2::Socket::new(domain, socket2::Type::STREAM, Some(socket2::Protocol::TCP))?;
if only_ipv6 {
socket.set_only_v6(true).with_context(|| "set only_ipv6")?;
}
socket
.set_reuse_port(true)
.with_context(|| "set_reuse_port")?;
socket
.bind(&addr.into())
.with_context(|| format!("bind to {}", addr))?;
socket
.listen(1024)
.with_context(|| format!("listen on {}", addr))?;
Ok(socket.into())
}

View file

@ -0,0 +1,21 @@
use axum::{
extract::{Path, RawQuery},
headers::UserAgent,
http::StatusCode,
Extension, TypedHeader,
};
use sqlx::mysql::MySqlPool;
use super::db;
pub async fn announce(
Extension(pool): Extension<MySqlPool>,
opt_user_agent: Option<TypedHeader<UserAgent>>,
Path(user_token): Path<String>,
RawQuery(query): RawQuery,
) -> Result<String, (StatusCode, String)> {
match db::announce(&pool).await {
Ok(r) => Ok(format!("{:?}", r)),
Err(err) => Err((StatusCode::INTERNAL_SERVER_ERROR, err.to_string())),
}
}