psocks/src/main.rs
2026-03-27 11:29:51 +02:00

207 lines
6.1 KiB
Rust

mod opt;
mod rules;
mod stats;
use anyhow::Context;
use fast_socks5::{
ReplyError, Result, Socks5Command, SocksError,
server::{DnsResolveHelper as _, Socks5ServerProtocol, run_tcp_proxy, run_udp_proxy},
};
use log::*;
use opt::{AuthMode, Opt};
use rocket::{State, http::Status, serde::json::Json};
use rules::Rules;
use stats::{Snap, Total};
use std::{future::Future, sync::Arc, time::Instant};
use structopt::StructOpt;
use tokio::{net::TcpListener, task};
#[rocket::get("/")]
async fn index(totals: &State<Arc<Total>>, startup_time: &State<Instant>) -> Json<Snap> {
Json(totals.inner().snap(startup_time.elapsed().as_secs())) // @TODO implement Web UI
}
#[rocket::get("/api/totals")]
async fn api_totals(totals: &State<Arc<Total>>, startup_time: &State<Instant>) -> Json<Snap> {
Json(totals.inner().snap(startup_time.elapsed().as_secs()))
}
#[rocket::get("/api/allow/<rule>")]
async fn api_allow(
rule: &str,
rules: &State<Arc<Rules>>,
totals: &State<Arc<Total>>,
) -> Result<Json<bool>, Status> {
let result = rules.allow(rule).await;
totals.set_entries(rules.total().await);
info!("Delete `{rule}` from the in-memory rules (operation status: {result:?})");
Ok(Json(result.map_err(|e| {
error!("Allow request handle error for `{rule}`: `{e}`");
Status::InternalServerError
})?))
}
#[rocket::get("/api/block/<rule>")]
async fn api_block(
rule: &str,
rules: &State<Arc<Rules>>,
totals: &State<Arc<Total>>,
) -> Result<Json<bool>, Status> {
let result = rules.block(rule).await;
totals.set_entries(rules.total().await);
info!("Add `{rule}` to the in-memory rules (operation status: {result:?})");
Ok(Json(result.map_err(|e| {
error!("Block request handle error for `{rule}`: `{e}`");
Status::InternalServerError
})?))
}
#[rocket::get("/api/rules")]
async fn api_rules(rules: &State<Arc<Rules>>) -> Result<Json<Vec<String>>, Status> {
let active = rules.active().await;
info!("Get rules (total: {})", active.len());
Ok(Json(active))
}
#[rocket::launch]
async fn rocket() -> _ {
env_logger::init();
let opt: &'static Opt = Box::leak(Box::new(Opt::from_args()));
let rules = Arc::new(Rules::from_opt(&opt.allow_list).await.unwrap());
let totals = Arc::new(Total::with_rules(rules.total().await));
tokio::spawn({
let socks_rules = rules.clone();
let socks_totals = totals.clone();
async move {
if let Err(err) = spawn_socks_server(opt, socks_rules, socks_totals).await {
error!("SOCKS server failed: `{err}`");
}
}
});
rocket::build()
.configure(rocket::Config {
port: opt.api_addr.port(),
address: opt.api_addr.ip(),
..rocket::Config::release_default()
})
.manage(rules)
.manage(totals)
.manage(Instant::now())
.mount(
"/",
rocket::routes![index, api_totals, api_allow, api_block, api_rules],
)
}
async fn spawn_socks_server(
opt: &'static Opt,
rules: Arc<Rules>,
totals: Arc<Total>,
) -> Result<()> {
if opt.allow_udp && opt.public_addr.is_none() {
return Err(SocksError::ArgumentInputError(
"Can't allow UDP if public-addr is not set",
));
}
if opt.skip_auth && opt.auth != AuthMode::NoAuth {
return Err(SocksError::ArgumentInputError(
"Can't use skip-auth flag and authentication altogether.",
));
}
let listener = TcpListener::bind(&opt.listen_addr).await?;
info!("Listen for socks connections @ {}", &opt.listen_addr);
loop {
match listener.accept().await {
Ok((socket, _client_addr)) => {
spawn_and_log_error(serve_socks5(opt, socket, rules.clone(), totals.clone()));
}
Err(err) => error!("accept error = {:?}", err),
}
}
}
async fn serve_socks5(
opt: &Opt,
socket: tokio::net::TcpStream,
rules: Arc<Rules>,
totals: Arc<Total>,
) -> Result<(), SocksError> {
totals.increase_request();
let request = match &opt.auth {
AuthMode::NoAuth if opt.skip_auth => {
Socks5ServerProtocol::skip_auth_this_is_not_rfc_compliant(socket)
}
AuthMode::NoAuth => Socks5ServerProtocol::accept_no_auth(socket).await?,
AuthMode::Password { username, password } => {
Socks5ServerProtocol::accept_password_auth(socket, |user, pass| {
user == *username && pass == *password
})
.await?
.0
}
}
.read_command()
.await?;
let (host, _) = request.2.clone().into_string_and_port();
if !rules.any(&host).await {
totals.increase_blocked();
info!("Blocked connection attempt to: {host}");
request
.0
.reply_error(&ReplyError::ConnectionNotAllowed)
.await?;
return Err(ReplyError::ConnectionNotAllowed.into());
}
let (proto, cmd, addr) = request.resolve_dns().await?;
match cmd {
Socks5Command::TCPConnect => {
run_tcp_proxy(proto, &addr, opt.request_timeout, false).await?;
}
Socks5Command::UDPAssociate if opt.allow_udp => {
run_udp_proxy(
proto,
&addr,
None,
opt.public_addr.context("invalid reply ip")?,
None,
)
.await?;
}
_ => {
proto.reply_error(&ReplyError::CommandNotSupported).await?;
return Err(ReplyError::CommandNotSupported.into());
}
};
Ok(())
}
fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>
where
F: Future<Output = Result<()>> + Send + 'static,
{
task::spawn(async move {
match fut.await {
Ok(()) => {}
Err(err) => match err {
SocksError::ReplyError(reply_error) => {
if !matches!(reply_error, ReplyError::ConnectionNotAllowed) {
error!("{reply_error:#}")
}
}
_ => error!("{err:#}"),
},
}
})
}