diff --git a/.github/FUNDING.yml b/.github/FUNDING.yml new file mode 100644 index 0000000..ada8a24 --- /dev/null +++ b/.github/FUNDING.yml @@ -0,0 +1 @@ +custom: https://yggverse.github.io/#donate \ No newline at end of file diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml new file mode 100644 index 0000000..cb2e928 --- /dev/null +++ b/.github/workflows/linux.yml @@ -0,0 +1,24 @@ +name: Linux + +on: + push: + branches: [ "main" ] + pull_request: + branches: [ "main" ] + +env: + CARGO_TERM_COLOR: always + RUSTFLAGS: -Dwarnings + +jobs: + build: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + - run: rustup update + - run: cargo fmt --all -- --check + - run: cargo clippy --all-targets + - run: cargo build --verbose + - run: cargo test --verbose diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..869df07 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..462fa04 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "yggtrackerd" +version = "0.1.0" +edition = "2024" +license = "MIT" +readme = "README.md" +description = "Crawler daemon for the yggtracker-redb index, based on the librqbit API" +keywords = ["aquatic", "librqbit", "yggtracker", "crawler", "bittorrent"] +categories = ["network-programming"] +repository = "https://github.com/YGGverse/yggtrackerd" +# homepage = "https://yggverse.github.io" + +[dependencies] +anyhow = "1.0" +chrono = "0.4.41" +clap = { version = "4.5", features = ["derive"] } +hyper-util = "0.1" +librqbit = {version = "9.0.0-beta.1", features = ["disable-upload"] } +tokio = { version = "1.45", features = ["full"] } +tracing-subscriber = "0.3" +url = "2.5" +urlencoding = "2.1" +yggtracker-redb = "0.1" + +[patch.crates-io] +yggtracker-redb = { git = "https://github.com/YGGverse/yggtracker-redb.git", rev="008402696d81c4b7d4cc7786c9cf3263ae8632b6" } +librqbit = { git = "https://github.com/ikatson/rqbit.git", rev="02dfb80b939a52abd0854339e2a8223a71563a68" } \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..f3085b7 --- /dev/null +++ b/README.md @@ -0,0 +1,30 @@ +# yggtrackerd + +![Linux](https://github.com/YGGverse/yggtrackerd/actions/workflows/linux.yml/badge.svg) +[![Dependencies](https://deps.rs/repo/github/YGGverse/yggtrackerd/status.svg)](https://deps.rs/repo/github/YGGverse/yggtrackerd) +[![crates.io](https://img.shields.io/crates/v/yggtrackerd.svg)](https://crates.io/crates/yggtrackerd) + +Crawler daemon for the yggtracker-redb index, based on the librqbit API + +## Install + +1. `git clone https://github.com/YGGverse/yggtrackerd.git && cd yggtrackerd` +2. `cargo build --release` +3. `sudo install target/release/yggtrackerd /usr/local/bin/yggtrackerd` + +## Usage + +``` bash +yggtrackerd --infohash /path/to/info-hash-ipv6.bin\ + --infohash /path/to/another-source.bin\ + --tracker udp://host1:port\ + --tracker udp://host2:port\ + --database /path/to/index.redb\ + --preload /path/to/directory +``` + +### Options + +``` bash +yggtrackerd --help +``` \ No newline at end of file diff --git a/src/api.rs b/src/api.rs new file mode 100644 index 0000000..220a523 --- /dev/null +++ b/src/api.rs @@ -0,0 +1,53 @@ +mod info_hash; +use info_hash::InfoHash; + +/// Parse infohash from the source filepath, +/// decode hash bytes to `InfoHash` array on success. +/// +/// * return `None` if the `path` is not reachable +pub fn get(path: &str, capacity: usize) -> Option> { + use std::io::Read; + if !path.ends_with(".bin") { + todo!("Only sources in the `.bin` format are supported!") + } + if path.contains("://") { + todo!("URL source format is not supported!") + } + const L: usize = 20; // v1 only + let mut r = Vec::with_capacity(capacity); + let mut f = std::fs::File::open(path).ok()?; + loop { + let mut b = [0; L]; + if f.read(&mut b).ok()? != L { + break; + } + r.push(InfoHash::V1(b)) + } + Some(r) +} + +#[test] +fn test() { + use std::fs; + + #[cfg(not(any(target_os = "linux", target_os = "macos",)))] + { + todo!() + } + + const C: usize = 2; + + const P0: &str = "/tmp/yggtrackerd-api-test-0.bin"; + const P1: &str = "/tmp/yggtrackerd-api-test-1.bin"; + const P2: &str = "/tmp/yggtrackerd-api-test-2.bin"; + + fs::write(P0, vec![]).unwrap(); + fs::write(P1, vec![1; 40]).unwrap(); // 20 + 20 bytes + + assert!(get(P0, C).is_some_and(|b| b.is_empty())); + assert!(get(P1, C).is_some_and(|b| b.len() == 2)); + assert!(get(P2, C).is_none()); + + fs::remove_file(P0).unwrap(); + fs::remove_file(P1).unwrap(); +} diff --git a/src/api/info_hash.rs b/src/api/info_hash.rs new file mode 100644 index 0000000..20827f6 --- /dev/null +++ b/src/api/info_hash.rs @@ -0,0 +1,15 @@ +pub enum InfoHash { + V1([u8; 20]), +} + +impl std::fmt::Display for InfoHash { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::V1(i) => write!( + f, + "{}", + i.iter().map(|b| format!("{b:02x}")).collect::() + ), + } + } +} diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..81c0d52 --- /dev/null +++ b/src/config.rs @@ -0,0 +1,97 @@ +use clap::Parser; +use std::path::PathBuf; + +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +pub struct Config { + /// Path to the permanent [redb](https://www.redb.org) database + #[arg(long)] + pub database: PathBuf, + + /// Print debug output + #[arg(long, default_value_t = false)] + pub debug: bool, + + /// Absolute path(s) or URL(s) to import infohashes from the Aquatic tracker binary API + /// + /// * PR#233 feature ([Wiki](https://github.com/YGGverse/aquatic-crawler/wiki/Aquatic)) + #[arg(long)] + pub infohash: Vec, + + /// Define custom tracker(s) to preload the `.torrent` files info + #[arg(long)] + pub tracker: Vec, + + /// Define initial peer(s) to preload the `.torrent` files info + #[arg(long)] + pub initial_peer: Vec, + + /// Save resolved torrent files to given directory + #[arg(long)] + pub export_torrents: Option, + + /// Appends `--tracker` value to magnets and torrents + #[arg(long, default_value_t = false)] + pub export_trackers: bool, + + /// Enable DHT resolver + #[arg(long, default_value_t = false)] + pub enable_dht: bool, + + /// Bind resolver session on specified device name (`tun0`, `mycelium`, etc.) + #[arg(long)] + pub bind: Option, + + /// Bind listener on specified `host:port` (`[host]:port` for IPv6) + /// + /// * this option is useful only for binding the data exchange service, + /// to restrict the outgoing connections for torrent resolver, use `bind` option instead + #[arg(long)] + pub listen: Option, + + /// Directory path to store temporary preload data + #[arg(long)] + pub preload: PathBuf, + + /// Max size sum of preloaded files per torrent (match `preload_regex`) + #[arg(long)] + pub preload_max_filesize: Option, + + /// Max count of preloaded files per torrent (match `preload_regex`) + #[arg(long)] + pub preload_max_filecount: Option, + + /// Use `socks5://[username:password@]host:port` + #[arg(long)] + pub proxy_url: Option, + + // Peer options + #[arg(long)] + pub peer_connect_timeout: Option, + + #[arg(long)] + pub peer_read_write_timeout: Option, + + #[arg(long)] + pub peer_keep_alive_interval: Option, + + /// Estimated info-hash index capacity + #[arg(long, default_value_t = 1000)] + pub index_capacity: usize, + + /// Max time to handle each torrent + #[arg(long, default_value_t = 10)] + pub add_torrent_timeout: u64, + + /// Crawl loop delay in seconds + #[arg(long, default_value_t = 300)] + pub sleep: u64, + + /// Limit upload speed (b/s) + #[arg(long)] + pub upload_limit: Option, + + /// Limit download speed (b/s) + #[arg(long)] + pub download_limit: Option, +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..8fe3f43 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,312 @@ +mod api; +mod config; +mod peers; +mod preload; +mod trackers; + +use anyhow::Result; +use config::Config; +use librqbit::{ + AddTorrent, AddTorrentOptions, AddTorrentResponse, ConnectionOptions, PeerConnectionOptions, + SessionOptions, +}; +use peers::Peers; +use preload::Preload; +use std::{ + collections::{HashMap, HashSet}, + num::NonZero, + os::unix::ffi::OsStrExt, + path::PathBuf, + time::Duration, +}; +use trackers::Trackers; +use url::Url; +use yggtracker_redb::{ + Database, + torrent::{Torrent, meta::*}, +}; + +#[tokio::main] +async fn main() -> Result<()> { + use chrono::Local; + use clap::Parser; + use tokio::time; + + // init components + let time_init = Local::now(); + let config = Config::parse(); + if std::env::var("RUST_LOG").is_ok() { + tracing_subscriber::fmt::init() + } // librqbit impl dependency + let database = Database::init(&config.database)?; + let peers = Peers::init(&config.initial_peer)?; + let mut preload = Preload::init( + config.preload, + config.preload_max_filecount, + config.preload_max_filesize, + )?; + let trackers = Trackers::init(&config.tracker)?; + let session = librqbit::Session::new_with_opts( + preload.directory().clone(), + SessionOptions { + bind_device_name: config.bind, + listen: None, + connect: Some(ConnectionOptions { + enable_tcp: true, + proxy_url: config.proxy_url, + peer_opts: Some(PeerConnectionOptions { + connect_timeout: config.peer_connect_timeout.map(Duration::from_secs), + read_write_timeout: config.peer_read_write_timeout.map(Duration::from_secs), + keep_alive_interval: config.peer_keep_alive_interval.map(Duration::from_secs), + }), + }), + disable_upload: false, + disable_dht: !config.enable_dht, + disable_dht_persistence: true, + persistence: None, + ratelimits: librqbit::limits::LimitsConfig { + upload_bps: config.upload_limit.and_then(NonZero::new), + download_bps: config.download_limit.and_then(NonZero::new), + }, + trackers: trackers.list().clone(), + ..SessionOptions::default() + }, + ) + .await?; + + // begin + println!("Crawler started on {time_init}"); + let mut processed = HashSet::with_capacity(config.index_capacity); + + loop { + let time_queue = Local::now(); + if config.debug { + println!("\tQueue crawl begin on {time_queue}...") + } + for source in &config.infohash { + if config.debug { + println!("\tIndex source `{source}`...") + } + // grab latest info-hashes from this source + // * aquatic server may update the stats at this moment, handle result manually + for i in match api::get(source, config.index_capacity) { + Some(i) => i, + None => { + // skip without panic + if config.debug { + eprintln!( + "The feed `{source}` has an incomplete format (or is still updating); skip." + ) + } + continue; + } + } { + // convert to string once + let i = i.to_string(); + // already indexed? + if processed.contains(&i) { + continue; + } + if config.debug { + println!("\t\tIndex `{i}`...") + } + preload.clear(); + // run the crawler in single thread for performance reasons, + // use `timeout` argument option to skip the dead connections. + match time::timeout( + Duration::from_secs(config.add_torrent_timeout), + session.add_torrent( + AddTorrent::from_url(magnet( + &i, + if config.export_trackers && !trackers.is_empty() { + Some(trackers.list()) + } else { + None + }, + )), + Some(AddTorrentOptions { + paused: true, // continue after `only_files` init + overwrite: true, + disable_trackers: trackers.is_empty(), + initial_peers: peers.initial_peers(), + list_only: false, // we want to grab the images + // it is important to blacklist all files preload until initiation + only_files: Some(Vec::with_capacity( + config.preload_max_filecount.unwrap_or_default(), + )), + // the folder to preload temporary files (e.g. images for the audio albums) + output_folder: Some(preload.directory().to_string_lossy().to_string()), + ..Default::default() + }), + ), + ) + .await + { + Ok(r) => match r { + Ok(AddTorrentResponse::Added(id, mt)) => { + let mut update_only_files: HashSet = HashSet::with_capacity( + config.preload_max_filecount.unwrap_or_default(), + ); + let mut images: HashMap = HashMap::with_capacity( + config.preload_max_filecount.unwrap_or_default(), + ); + mt.wait_until_initialized().await?; + let (name, files, is_private, length, bytes) = mt.with_metadata(|m| { + for info in &m.file_infos { + if preload.max_filecount .is_some_and(|limit| images.len() + 1 > limit) { + if config.debug { + println!( + "\t\t\ttotal files count limit ({}) for `{i}` reached!", + preload.max_filecount.unwrap() + ) + } + break; + } + if preload.max_filesize.is_some_and(|limit| { + let sum :u64 = images.values().sum(); + sum + info.len > limit + }) { + if config.debug { + println!( + "\t\t\ttotal files size limit `{i}` reached!" + ) + } + break; + } + if info.relative_filename.extension().is_none_or(|e| + !matches!(e.as_bytes(), b"png" | b"jpeg" | b"jpg" | b"gif" | b"webp")) { + if config.debug { + println!( + "\t\t\tskip non-image file `{}` for `{i}`.", + info.relative_filename.to_string_lossy() + ) + } + continue; + } + images.insert(info.relative_filename.clone(), info.len); + assert!(update_only_files.insert(id)) + } + let mi = m.info.info(); + ( + mi.name.as_ref().map(|s| s.to_string()), + mi.files.as_ref().map(|f| { + let mut b = Vec::with_capacity(f.len()); + let mut i = f.iter(); + for f in i.by_ref() { + b.push(File { + path: String::from_utf8( + f.path + .iter() + .enumerate() + .flat_map(|(n, b)| { + if n == 0 { + b.0.to_vec() + } else { + let mut p = vec![b'/']; + p.extend(b.0.to_vec()); + p + } + }) + .collect(), + ) + .ok(), + length: f.length, + }); + } + b.sort_by(|a, b| a.path.cmp(&b.path)); // @TODO optional + b + }), + mi.private, + mi.length, + m.torrent_bytes.clone().into() + ) + })?; + session.update_only_files(&mt, &update_only_files).await?; + session.unpause(&mt).await?; + mt.wait_until_completed().await?; + assert!( + database + .set_torrent( + &i, + Torrent { + bytes, + meta: Meta { + comment: None, // @TODO + files, + images: if images.is_empty() { + None + } else { + let mut b = Vec::with_capacity(images.len()); + for path in images.keys() { + b.push(Image { + bytes: preload.bytes(path)?, + path: path + .to_string_lossy() + .to_string(), + }) + } + Some(b) + }, + is_private, + name, + length, + time: chrono::Utc::now(), + }, + }, + )? + .is_none() + ); + println!("\t\t\tadd torrent `{i}`"); + // remove torrent from session as indexed + session + .delete(librqbit::api::TorrentIdOrHash::Id(id), false) + .await?; + + if config.debug { + println!("\t\t\tadd `{i}` to index.") + } + assert!(processed.insert(i)) + } + Ok(_) => panic!(), + Err(e) => eprintln!("Failed to resolve `{i}`: `{e}`."), + }, + Err(e) => { + if config.debug { + println!("\t\t\tfailed to resolve `{i}`: `{e}`") + } + } + } + } + } + if config.debug { + println!( + "Queue completed on {time_queue}\n\ttotal: {}\n\ttime: {} s\n\tuptime: {} s\n\tawait {} seconds to continue...", + processed.len(), + Local::now() + .signed_duration_since(time_queue) + .as_seconds_f32(), + Local::now() + .signed_duration_since(time_init) + .as_seconds_f32(), + config.sleep, + ) + } + std::thread::sleep(Duration::from_secs(config.sleep)) + } +} + +/// Build magnet URI +fn magnet(infohash: &str, trackers: Option<&HashSet>) -> String { + let mut m = if infohash.len() == 40 { + format!("magnet:?xt=urn:btih:{infohash}") + } else { + todo!("infohash v2 is not supported by librqbit") + }; + if let Some(t) = trackers { + for tracker in t { + m.push_str("&tr="); + m.push_str(&urlencoding::encode(tracker.as_str())) + } + } + m +} diff --git a/src/peers.rs b/src/peers.rs new file mode 100644 index 0000000..f43da62 --- /dev/null +++ b/src/peers.rs @@ -0,0 +1,21 @@ +use std::{net::SocketAddr, str::FromStr}; + +pub struct Peers(Vec); + +impl Peers { + pub fn init(peers: &Vec) -> anyhow::Result { + let mut p = Vec::with_capacity(peers.len()); + for peer in peers { + p.push(SocketAddr::from_str(peer)?); + } + Ok(Self(p)) + } + + pub fn initial_peers(&self) -> Option> { + if self.0.is_empty() { + None + } else { + Some(self.0.clone()) + } + } +} diff --git a/src/preload.rs b/src/preload.rs new file mode 100644 index 0000000..88f56aa --- /dev/null +++ b/src/preload.rs @@ -0,0 +1,42 @@ +use anyhow::{Result, bail}; +use std::path::PathBuf; + +/// Temporary file storage for `librqbit` preload data +pub struct Preload { + directory: PathBuf, + pub max_filecount: Option, + pub max_filesize: Option, +} + +impl Preload { + pub fn init( + directory: PathBuf, + max_filecount: Option, + max_filesize: Option, + ) -> Result { + if !directory.is_dir() { + bail!("Preload location is not directory!"); + } + Ok(Self { + max_filecount, + max_filesize, + directory, + }) + } + + pub fn clear(&mut self) { + self.directory.clear() + } + + pub fn directory(&self) -> &PathBuf { + &self.directory + } + + pub fn bytes(&self, path: &PathBuf) -> Result> { + Ok(std::fs::read({ + let mut p = PathBuf::from(&self.directory); + p.push(path); + p + })?) + } +} diff --git a/src/trackers.rs b/src/trackers.rs new file mode 100644 index 0000000..d01c1e2 --- /dev/null +++ b/src/trackers.rs @@ -0,0 +1,20 @@ +use std::{collections::HashSet, str::FromStr}; +use url::Url; + +pub struct Trackers(HashSet); + +impl Trackers { + pub fn init(trackers: &Vec) -> anyhow::Result { + let mut t = HashSet::with_capacity(trackers.len()); + for tracker in trackers { + t.insert(Url::from_str(tracker)?); + } + Ok(Self(t)) + } + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } + pub fn list(&self) -> &HashSet { + &self.0 + } +}