initial commit

This commit is contained in:
yggverse 2025-08-01 20:04:38 +03:00
parent 9c4cc62a4f
commit de0747366d
12 changed files with 644 additions and 0 deletions

53
src/api.rs Normal file
View file

@ -0,0 +1,53 @@
mod info_hash;
use info_hash::InfoHash;
/// Parse infohash from the source filepath,
/// decode hash bytes to `InfoHash` array on success.
///
/// * return `None` if the `path` is not reachable
pub fn get(path: &str, capacity: usize) -> Option<Vec<InfoHash>> {
use std::io::Read;
if !path.ends_with(".bin") {
todo!("Only sources in the `.bin` format are supported!")
}
if path.contains("://") {
todo!("URL source format is not supported!")
}
const L: usize = 20; // v1 only
let mut r = Vec::with_capacity(capacity);
let mut f = std::fs::File::open(path).ok()?;
loop {
let mut b = [0; L];
if f.read(&mut b).ok()? != L {
break;
}
r.push(InfoHash::V1(b))
}
Some(r)
}
#[test]
fn test() {
use std::fs;
#[cfg(not(any(target_os = "linux", target_os = "macos",)))]
{
todo!()
}
const C: usize = 2;
const P0: &str = "/tmp/yggtrackerd-api-test-0.bin";
const P1: &str = "/tmp/yggtrackerd-api-test-1.bin";
const P2: &str = "/tmp/yggtrackerd-api-test-2.bin";
fs::write(P0, vec![]).unwrap();
fs::write(P1, vec![1; 40]).unwrap(); // 20 + 20 bytes
assert!(get(P0, C).is_some_and(|b| b.is_empty()));
assert!(get(P1, C).is_some_and(|b| b.len() == 2));
assert!(get(P2, C).is_none());
fs::remove_file(P0).unwrap();
fs::remove_file(P1).unwrap();
}

15
src/api/info_hash.rs Normal file
View file

@ -0,0 +1,15 @@
pub enum InfoHash {
V1([u8; 20]),
}
impl std::fmt::Display for InfoHash {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::V1(i) => write!(
f,
"{}",
i.iter().map(|b| format!("{b:02x}")).collect::<String>()
),
}
}
}

97
src/config.rs Normal file
View file

@ -0,0 +1,97 @@
use clap::Parser;
use std::path::PathBuf;
#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
pub struct Config {
/// Path to the permanent [redb](https://www.redb.org) database
#[arg(long)]
pub database: PathBuf,
/// Print debug output
#[arg(long, default_value_t = false)]
pub debug: bool,
/// Absolute path(s) or URL(s) to import infohashes from the Aquatic tracker binary API
///
/// * PR#233 feature ([Wiki](https://github.com/YGGverse/aquatic-crawler/wiki/Aquatic))
#[arg(long)]
pub infohash: Vec<String>,
/// Define custom tracker(s) to preload the `.torrent` files info
#[arg(long)]
pub tracker: Vec<String>,
/// Define initial peer(s) to preload the `.torrent` files info
#[arg(long)]
pub initial_peer: Vec<String>,
/// Save resolved torrent files to given directory
#[arg(long)]
pub export_torrents: Option<String>,
/// Appends `--tracker` value to magnets and torrents
#[arg(long, default_value_t = false)]
pub export_trackers: bool,
/// Enable DHT resolver
#[arg(long, default_value_t = false)]
pub enable_dht: bool,
/// Bind resolver session on specified device name (`tun0`, `mycelium`, etc.)
#[arg(long)]
pub bind: Option<String>,
/// Bind listener on specified `host:port` (`[host]:port` for IPv6)
///
/// * this option is useful only for binding the data exchange service,
/// to restrict the outgoing connections for torrent resolver, use `bind` option instead
#[arg(long)]
pub listen: Option<String>,
/// Directory path to store temporary preload data
#[arg(long)]
pub preload: PathBuf,
/// Max size sum of preloaded files per torrent (match `preload_regex`)
#[arg(long)]
pub preload_max_filesize: Option<u64>,
/// Max count of preloaded files per torrent (match `preload_regex`)
#[arg(long)]
pub preload_max_filecount: Option<usize>,
/// Use `socks5://[username:password@]host:port`
#[arg(long)]
pub proxy_url: Option<String>,
// Peer options
#[arg(long)]
pub peer_connect_timeout: Option<u64>,
#[arg(long)]
pub peer_read_write_timeout: Option<u64>,
#[arg(long)]
pub peer_keep_alive_interval: Option<u64>,
/// Estimated info-hash index capacity
#[arg(long, default_value_t = 1000)]
pub index_capacity: usize,
/// Max time to handle each torrent
#[arg(long, default_value_t = 10)]
pub add_torrent_timeout: u64,
/// Crawl loop delay in seconds
#[arg(long, default_value_t = 300)]
pub sleep: u64,
/// Limit upload speed (b/s)
#[arg(long)]
pub upload_limit: Option<u32>,
/// Limit download speed (b/s)
#[arg(long)]
pub download_limit: Option<u32>,
}

312
src/main.rs Normal file
View file

@ -0,0 +1,312 @@
mod api;
mod config;
mod peers;
mod preload;
mod trackers;
use anyhow::Result;
use config::Config;
use librqbit::{
AddTorrent, AddTorrentOptions, AddTorrentResponse, ConnectionOptions, PeerConnectionOptions,
SessionOptions,
};
use peers::Peers;
use preload::Preload;
use std::{
collections::{HashMap, HashSet},
num::NonZero,
os::unix::ffi::OsStrExt,
path::PathBuf,
time::Duration,
};
use trackers::Trackers;
use url::Url;
use yggtracker_redb::{
Database,
torrent::{Torrent, meta::*},
};
#[tokio::main]
async fn main() -> Result<()> {
use chrono::Local;
use clap::Parser;
use tokio::time;
// init components
let time_init = Local::now();
let config = Config::parse();
if std::env::var("RUST_LOG").is_ok() {
tracing_subscriber::fmt::init()
} // librqbit impl dependency
let database = Database::init(&config.database)?;
let peers = Peers::init(&config.initial_peer)?;
let mut preload = Preload::init(
config.preload,
config.preload_max_filecount,
config.preload_max_filesize,
)?;
let trackers = Trackers::init(&config.tracker)?;
let session = librqbit::Session::new_with_opts(
preload.directory().clone(),
SessionOptions {
bind_device_name: config.bind,
listen: None,
connect: Some(ConnectionOptions {
enable_tcp: true,
proxy_url: config.proxy_url,
peer_opts: Some(PeerConnectionOptions {
connect_timeout: config.peer_connect_timeout.map(Duration::from_secs),
read_write_timeout: config.peer_read_write_timeout.map(Duration::from_secs),
keep_alive_interval: config.peer_keep_alive_interval.map(Duration::from_secs),
}),
}),
disable_upload: false,
disable_dht: !config.enable_dht,
disable_dht_persistence: true,
persistence: None,
ratelimits: librqbit::limits::LimitsConfig {
upload_bps: config.upload_limit.and_then(NonZero::new),
download_bps: config.download_limit.and_then(NonZero::new),
},
trackers: trackers.list().clone(),
..SessionOptions::default()
},
)
.await?;
// begin
println!("Crawler started on {time_init}");
let mut processed = HashSet::with_capacity(config.index_capacity);
loop {
let time_queue = Local::now();
if config.debug {
println!("\tQueue crawl begin on {time_queue}...")
}
for source in &config.infohash {
if config.debug {
println!("\tIndex source `{source}`...")
}
// grab latest info-hashes from this source
// * aquatic server may update the stats at this moment, handle result manually
for i in match api::get(source, config.index_capacity) {
Some(i) => i,
None => {
// skip without panic
if config.debug {
eprintln!(
"The feed `{source}` has an incomplete format (or is still updating); skip."
)
}
continue;
}
} {
// convert to string once
let i = i.to_string();
// already indexed?
if processed.contains(&i) {
continue;
}
if config.debug {
println!("\t\tIndex `{i}`...")
}
preload.clear();
// run the crawler in single thread for performance reasons,
// use `timeout` argument option to skip the dead connections.
match time::timeout(
Duration::from_secs(config.add_torrent_timeout),
session.add_torrent(
AddTorrent::from_url(magnet(
&i,
if config.export_trackers && !trackers.is_empty() {
Some(trackers.list())
} else {
None
},
)),
Some(AddTorrentOptions {
paused: true, // continue after `only_files` init
overwrite: true,
disable_trackers: trackers.is_empty(),
initial_peers: peers.initial_peers(),
list_only: false, // we want to grab the images
// it is important to blacklist all files preload until initiation
only_files: Some(Vec::with_capacity(
config.preload_max_filecount.unwrap_or_default(),
)),
// the folder to preload temporary files (e.g. images for the audio albums)
output_folder: Some(preload.directory().to_string_lossy().to_string()),
..Default::default()
}),
),
)
.await
{
Ok(r) => match r {
Ok(AddTorrentResponse::Added(id, mt)) => {
let mut update_only_files: HashSet<usize> = HashSet::with_capacity(
config.preload_max_filecount.unwrap_or_default(),
);
let mut images: HashMap<PathBuf, u64> = HashMap::with_capacity(
config.preload_max_filecount.unwrap_or_default(),
);
mt.wait_until_initialized().await?;
let (name, files, is_private, length, bytes) = mt.with_metadata(|m| {
for info in &m.file_infos {
if preload.max_filecount .is_some_and(|limit| images.len() + 1 > limit) {
if config.debug {
println!(
"\t\t\ttotal files count limit ({}) for `{i}` reached!",
preload.max_filecount.unwrap()
)
}
break;
}
if preload.max_filesize.is_some_and(|limit| {
let sum :u64 = images.values().sum();
sum + info.len > limit
}) {
if config.debug {
println!(
"\t\t\ttotal files size limit `{i}` reached!"
)
}
break;
}
if info.relative_filename.extension().is_none_or(|e|
!matches!(e.as_bytes(), b"png" | b"jpeg" | b"jpg" | b"gif" | b"webp")) {
if config.debug {
println!(
"\t\t\tskip non-image file `{}` for `{i}`.",
info.relative_filename.to_string_lossy()
)
}
continue;
}
images.insert(info.relative_filename.clone(), info.len);
assert!(update_only_files.insert(id))
}
let mi = m.info.info();
(
mi.name.as_ref().map(|s| s.to_string()),
mi.files.as_ref().map(|f| {
let mut b = Vec::with_capacity(f.len());
let mut i = f.iter();
for f in i.by_ref() {
b.push(File {
path: String::from_utf8(
f.path
.iter()
.enumerate()
.flat_map(|(n, b)| {
if n == 0 {
b.0.to_vec()
} else {
let mut p = vec![b'/'];
p.extend(b.0.to_vec());
p
}
})
.collect(),
)
.ok(),
length: f.length,
});
}
b.sort_by(|a, b| a.path.cmp(&b.path)); // @TODO optional
b
}),
mi.private,
mi.length,
m.torrent_bytes.clone().into()
)
})?;
session.update_only_files(&mt, &update_only_files).await?;
session.unpause(&mt).await?;
mt.wait_until_completed().await?;
assert!(
database
.set_torrent(
&i,
Torrent {
bytes,
meta: Meta {
comment: None, // @TODO
files,
images: if images.is_empty() {
None
} else {
let mut b = Vec::with_capacity(images.len());
for path in images.keys() {
b.push(Image {
bytes: preload.bytes(path)?,
path: path
.to_string_lossy()
.to_string(),
})
}
Some(b)
},
is_private,
name,
length,
time: chrono::Utc::now(),
},
},
)?
.is_none()
);
println!("\t\t\tadd torrent `{i}`");
// remove torrent from session as indexed
session
.delete(librqbit::api::TorrentIdOrHash::Id(id), false)
.await?;
if config.debug {
println!("\t\t\tadd `{i}` to index.")
}
assert!(processed.insert(i))
}
Ok(_) => panic!(),
Err(e) => eprintln!("Failed to resolve `{i}`: `{e}`."),
},
Err(e) => {
if config.debug {
println!("\t\t\tfailed to resolve `{i}`: `{e}`")
}
}
}
}
}
if config.debug {
println!(
"Queue completed on {time_queue}\n\ttotal: {}\n\ttime: {} s\n\tuptime: {} s\n\tawait {} seconds to continue...",
processed.len(),
Local::now()
.signed_duration_since(time_queue)
.as_seconds_f32(),
Local::now()
.signed_duration_since(time_init)
.as_seconds_f32(),
config.sleep,
)
}
std::thread::sleep(Duration::from_secs(config.sleep))
}
}
/// Build magnet URI
fn magnet(infohash: &str, trackers: Option<&HashSet<Url>>) -> String {
let mut m = if infohash.len() == 40 {
format!("magnet:?xt=urn:btih:{infohash}")
} else {
todo!("infohash v2 is not supported by librqbit")
};
if let Some(t) = trackers {
for tracker in t {
m.push_str("&tr=");
m.push_str(&urlencoding::encode(tracker.as_str()))
}
}
m
}

21
src/peers.rs Normal file
View file

@ -0,0 +1,21 @@
use std::{net::SocketAddr, str::FromStr};
pub struct Peers(Vec<SocketAddr>);
impl Peers {
pub fn init(peers: &Vec<String>) -> anyhow::Result<Self> {
let mut p = Vec::with_capacity(peers.len());
for peer in peers {
p.push(SocketAddr::from_str(peer)?);
}
Ok(Self(p))
}
pub fn initial_peers(&self) -> Option<Vec<SocketAddr>> {
if self.0.is_empty() {
None
} else {
Some(self.0.clone())
}
}
}

42
src/preload.rs Normal file
View file

@ -0,0 +1,42 @@
use anyhow::{Result, bail};
use std::path::PathBuf;
/// Temporary file storage for `librqbit` preload data
pub struct Preload {
directory: PathBuf,
pub max_filecount: Option<usize>,
pub max_filesize: Option<u64>,
}
impl Preload {
pub fn init(
directory: PathBuf,
max_filecount: Option<usize>,
max_filesize: Option<u64>,
) -> Result<Self> {
if !directory.is_dir() {
bail!("Preload location is not directory!");
}
Ok(Self {
max_filecount,
max_filesize,
directory,
})
}
pub fn clear(&mut self) {
self.directory.clear()
}
pub fn directory(&self) -> &PathBuf {
&self.directory
}
pub fn bytes(&self, path: &PathBuf) -> Result<Vec<u8>> {
Ok(std::fs::read({
let mut p = PathBuf::from(&self.directory);
p.push(path);
p
})?)
}
}

20
src/trackers.rs Normal file
View file

@ -0,0 +1,20 @@
use std::{collections::HashSet, str::FromStr};
use url::Url;
pub struct Trackers(HashSet<Url>);
impl Trackers {
pub fn init(trackers: &Vec<String>) -> anyhow::Result<Self> {
let mut t = HashSet::with_capacity(trackers.len());
for tracker in trackers {
t.insert(Url::from_str(tracker)?);
}
Ok(Self(t))
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn list(&self) -> &HashSet<Url> {
&self.0
}
}