diff --git a/Cargo.toml b/Cargo.toml index 7b6db30..ffd684f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,15 +11,15 @@ repository = "https://github.com/YGGverse/aquatic-crawler" # homepage = "https://yggverse.github.io" [dependencies] -anyhow = "1.0.98" +anyhow = "1.0" clap = { version = "4.5", features = ["derive"] } -hyper-util = "0.1.14" -librqbit = {version = "9.0.0-beta.0", features = ["disable-upload"]} +hyper-util = "0.1" +librqbit = {version = "8.1", features = ["disable-upload"]} serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" tokio = { version = "1.45", features = ["full"] } tracing-subscriber = "0.3" -url = "2.5.4" +url = "2.5" [patch.crates-io] -librqbit = { git = "https://github.com/ikatson/rqbit.git", branch = "tracker-udp-dualstack", package = "librqbit" } -#librqbit = { path = "../../rqbit/crates/librqbit", package = "librqbit" } \ No newline at end of file +librqbit = { version = "9.0.0-beta.0", git = "https://github.com/ikatson/rqbit.git", package = "librqbit", features = ["disable-upload"] } +#librqbit = { version = "9.0.0-beta.0", path = "../../rqbit/crates/librqbit", package = "librqbit" } \ No newline at end of file diff --git a/README.md b/README.md index df3d47b..f3c9345 100644 --- a/README.md +++ b/README.md @@ -47,7 +47,7 @@ aquatic-crawler --infohash-file /path/to/info-hash-ipv4.json\ -d, --debug Debug level - * `e` - error * `i` - info * `t` - trace (e.g. to run with `RUST_LOG=librqbit=trace`) + * `e` - error * `i` - info * `t` - trace (run with `RUST_LOG=librqbit=trace`) [default: ei] @@ -60,7 +60,7 @@ aquatic-crawler --infohash-file /path/to/info-hash-ipv4.json\ * PR#233 feature --storage - Directory path to store reload data (e.g. `.torrent` files) + Directory path to store preloaded data (e.g. `.torrent` files) --torrent-tracker Define custom tracker(s) to preload the `.torrent` files info @@ -71,14 +71,38 @@ aquatic-crawler --infohash-file /path/to/info-hash-ipv4.json\ --enable-dht Enable DHT resolver +--enable-upnp-port-forwarding + Enable UPnP + --enable-upload Enable upload +--preload-regex + Preload files match regex pattern (list only without preload by default) + + ## Example: + + Filter by image ext ``` --preload-regex '\.(png|gif|jpeg|webp)$' ``` + + * requires `storage` argument defined + +--save-torrents + Save resolved torrent files to the `storage` location + +--socks-proxy-url + Use `socks5://[username:password@]host:port` + -s Crawl loop delay in seconds [default: 300] +--upload-limit + Limit upload speed (b/s) + +--download-limit + Limit download speed (b/s) + -h, --help Print help (see a summary with '-h') diff --git a/src/argument.rs b/src/argument.rs index 9cfe13a..be06afd 100644 --- a/src/argument.rs +++ b/src/argument.rs @@ -7,7 +7,7 @@ pub struct Argument { /// /// * `e` - error /// * `i` - info - /// * `t` - trace (e.g. to run with `RUST_LOG=librqbit=trace`) + /// * `t` - trace (run with `RUST_LOG=librqbit=trace`) #[arg(short, long, default_value_t = String::from("ei"))] pub debug: String, @@ -21,9 +21,9 @@ pub struct Argument { #[arg(long)] pub infohash_file: Vec, - /// Directory path to store reload data (e.g. `.torrent` files) + /// Directory path to store preloaded data (e.g. `.torrent` files) #[arg(long)] - pub storage: Option, + pub storage: String, /// Define custom tracker(s) to preload the `.torrent` files info #[arg(long)] @@ -45,6 +45,23 @@ pub struct Argument { #[arg(long, default_value_t = false)] pub enable_upload: bool, + /// Preload files match regex pattern (list only without preload by default) + /// + /// ## Example: + /// + /// Filter by image ext + /// ``` + /// --preload-regex '\.(png|gif|jpeg|webp)$' + /// ``` + /// + /// * requires `storage` argument defined + #[arg(long)] + pub preload_regex: Option, + + /// Save resolved torrent files to the `storage` location + #[arg(long, default_value_t = true)] + pub save_torrents: bool, + /// Use `socks5://[username:password@]host:port` #[arg(long)] pub socks_proxy_url: Option, @@ -52,4 +69,12 @@ pub struct Argument { /// Crawl loop delay in seconds #[arg(short, default_value_t = 300)] pub sleep: u64, + + /// Limit upload speed (b/s) + #[arg(long)] + pub upload_limit: Option, + + /// Limit download speed (b/s) + #[arg(long)] + pub download_limit: Option, } diff --git a/src/database.rs b/src/database.rs deleted file mode 100644 index 89c2730..0000000 --- a/src/database.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod torrent; diff --git a/src/database/torrent.rs b/src/database/torrent.rs deleted file mode 100644 index 984ee56..0000000 --- a/src/database/torrent.rs +++ /dev/null @@ -1,38 +0,0 @@ -use anyhow::{Result, bail}; -use std::{fs, io::Write, path::PathBuf, str::FromStr}; - -pub struct Storage(PathBuf); - -impl Storage { - pub fn init(storage: &str, clear: bool) -> Result { - let p = PathBuf::from_str(storage)?; - if let Ok(t) = fs::metadata(&p) { - if t.is_file() { - bail!("Target destination is not directory!") - } - if t.is_dir() && clear { - fs::remove_dir_all(&p)?; - } - } - fs::create_dir_all(&p)?; - Ok(Self(p)) - } - - pub fn exists(&self, infohash: &str) -> bool { - fs::metadata(self.filename(infohash)).is_ok_and(|p| p.is_file()) - } - - pub fn save(&self, infohash: &str, bytes: &[u8]) -> Result { - let p = self.filename(infohash); - let mut f = fs::File::create(&p)?; - f.write_all(bytes)?; - Ok(p) - } - - fn filename(&self, infohash: &str) -> PathBuf { - let mut p = PathBuf::new(); - p.push(&self.0); - p.push(format!("{infohash}.torrent")); - p - } -} diff --git a/src/debug.rs b/src/debug.rs index 46024ec..f7cc8b7 100644 --- a/src/debug.rs +++ b/src/debug.rs @@ -1,9 +1,28 @@ -pub fn error(e: &anyhow::Error) { - eprintln!("[{}] [error] {e}", now()) -} +mod level; +use level::Level; -pub fn info(message: String) { - println!("[{}] [info] {message}", now()) +pub struct Debug(Vec); + +impl Debug { + pub fn init(levels: &str) -> anyhow::Result { + let mut l = Vec::with_capacity(levels.len()); + for s in levels.to_lowercase().chars() { + l.push(Level::parse(s)?); + } + Ok(Self(l)) + } + + pub fn error(&self, message: &str) { + if self.0.contains(&Level::Error) { + eprintln!("[{}] [error] {message}", now()); + } + } + + pub fn info(&self, message: &str) { + if self.0.contains(&Level::Info) { + println!("[{}] [info] {message}", now()); + } + } } fn now() -> u128 { diff --git a/src/debug/level.rs b/src/debug/level.rs new file mode 100644 index 0000000..5b629aa --- /dev/null +++ b/src/debug/level.rs @@ -0,0 +1,22 @@ +use anyhow::{Result, bail}; + +#[derive(PartialEq)] +pub enum Level { + Error, + Info, + Trace, +} + +impl Level { + pub fn parse(value: char) -> Result { + match value { + 'e' => Ok(Self::Error), + 'i' => Ok(Self::Info), + 't' => { + tracing_subscriber::fmt::init(); + Ok(Self::Trace) + } + _ => bail!("Unsupported debug value `{value}`!"), + } + } +} diff --git a/src/main.rs b/src/main.rs index e9501b8..e996d15 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,132 +1,141 @@ mod api; mod argument; -mod database; mod debug; +mod peers; +mod storage; +mod trackers; + +use anyhow::Result; +use debug::Debug; +use storage::Storage; #[tokio::main] -async fn main() -> anyhow::Result<()> { +async fn main() -> Result<()> { use clap::Parser; - use librqbit::SessionOptions; - use std::str::FromStr; + use librqbit::{AddTorrent, AddTorrentOptions, AddTorrentResponse, SessionOptions}; + use std::{num::NonZero, time::Duration}; - let argument = argument::Argument::parse(); - - // calculate debug level once - let is_debug_i = argument.debug.contains("i"); - let is_debug_e = argument.debug.contains("e"); - - if argument.debug.contains("t") { - tracing_subscriber::fmt::init() - } - - // init shared members - let torrent_storage = if let Some(t) = argument.storage { - let s = database::torrent::Storage::init(&t, argument.clear)?; - if argument.clear && is_debug_i { - debug::info(String::from("Cleanup torrent storage")); - } - Some(s) - } else { - None - }; - - let mut trackers = std::collections::HashSet::with_capacity(argument.torrent_tracker.len()); - for tracker in argument.torrent_tracker { - trackers.insert(url::Url::from_str(&tracker)?); - } - - let mut peers = Vec::with_capacity(argument.initial_peer.len()); - for peer in argument.initial_peer { - peers.push(std::net::SocketAddr::from_str(&peer)?); - } + // init components + let arg = argument::Argument::parse(); + let debug = Debug::init(&arg.debug)?; + let peers = peers::Peers::init(&arg.initial_peer)?; + let storage = Storage::init(&arg.storage, arg.clear)?; + let trackers = trackers::Trackers::init(&arg.torrent_tracker)?; + let session = librqbit::Session::new_with_opts( + storage.path(), + SessionOptions { + disable_upload: !arg.enable_upload, + disable_dht: !arg.enable_dht, + disable_dht_persistence: true, + persistence: None, + ratelimits: librqbit::limits::LimitsConfig { + upload_bps: arg.upload_limit.and_then(NonZero::new), + download_bps: arg.download_limit.and_then(NonZero::new), + }, + trackers: trackers.clone(), + ..SessionOptions::default() + }, + ) + .await?; // begin - if is_debug_i { - debug::info(String::from("Crawler started")); - } + debug.info("Crawler started"); loop { - if is_debug_i { - debug::info(String::from("New index session begin...")); - } + debug.info("Index queue begin..."); let mut total = 0; - let session = librqbit::Session::new_with_opts( - std::path::PathBuf::new(), - SessionOptions { - disable_dht: !argument.enable_dht, - disable_upload: !argument.enable_upload, - persistence: None, - trackers: trackers.clone(), - ..SessionOptions::default() - }, - ) - .await?; - // collect info-hashes from API - for source in &argument.infohash_file { - if is_debug_i { - debug::info(format!("Handle info-hash source `{source}`...")); - } - + // collect info-hashes from each API channel + for source in &arg.infohash_file { + debug.info(&format!("Handle info-hash source `{source}`...")); // aquatic server may update the stats at this moment, // handle this state manually match api::infohashes(source) { Ok(infohashes) => { total += infohashes.len(); for i in infohashes { - if torrent_storage.as_ref().is_some_and(|s| !s.exists(&i)) { - if is_debug_i { - debug::info(format!("Resolve `{i}`...")); - } - match session - .add_torrent( - librqbit::AddTorrent::from_url(format!( - "magnet:?xt=urn:btih:{i}" - )), - Some(librqbit::AddTorrentOptions { - disable_trackers: trackers.is_empty(), - initial_peers: if peers.is_empty() { - None - } else { - Some(peers.clone()) - }, - list_only: true, - ..Default::default() - }), - ) - .await? - { - librqbit::AddTorrentResponse::ListOnly(r) => { - if let Some(ref s) = torrent_storage { - let p = s.save(&i, &r.torrent_bytes)?; - if is_debug_i { - debug::info(format!( - "Add new torrent file `{}`", - p.to_string_lossy() - )); + debug.info(&format!("Index `{i}`...")); + match session + .add_torrent( + AddTorrent::from_url(format!("magnet:?xt=urn:btih:{i}")), + Some(AddTorrentOptions { + overwrite: true, + disable_trackers: trackers.is_empty(), + initial_peers: if peers.is_empty() { + None + } else { + Some(peers.clone()) + }, + // preload nothing, but listing when regex pattern argument is given + list_only: arg.preload_regex.is_none(), + // this option allows rqbit manager to preload some or any files match pattern + // * useful to build index with multimedia files, like images for audio albums + output_folder: storage.output_folder(&i).ok(), + // applies preload some files to the destination directory (above) + only_files_regex: arg.preload_regex.clone(), + ..Default::default() + }), + ) + .await + { + Ok(r) => match r { + AddTorrentResponse::AlreadyManaged(_, t) + | AddTorrentResponse::Added(_, t) => { + if arg.save_torrents { + t.with_metadata(|m| { + save_torrent_file( + &storage, + &debug, + &i, + &m.torrent_bytes, + ) + })?; + } + /*tokio::spawn({ + let t = t.clone(); + let d = Duration::from_secs(5); + async move { + loop { + let s = t.stats(); + if s.finished { + break; + } + debug.info(&format!("{s}...")); + tokio::time::sleep(d).await; + } } + });*/ + // @TODO t.wait_until_completed().await?; + } + AddTorrentResponse::ListOnly(r) => { + if arg.save_torrents { + save_torrent_file(&storage, &debug, &i, &r.torrent_bytes) } // @TODO // use `r.info` for Memory, SQLite, Manticore and other alternative storage type } - _ => panic!(), - } + }, + Err(e) => debug.info(&format!("Torrent handle skipped: `{e}`")), } } } - Err(ref e) => { - if is_debug_e { - debug::error(e) - } - } + Err(e) => debug.error(&e.to_string()), } } - session.stop().await; - if is_debug_i { - debug::info(format!( - "Index of {total} hashes completed, await {} seconds to continue...", - argument.sleep, - )); - } - std::thread::sleep(std::time::Duration::from_secs(argument.sleep)); + debug.info(&format!( + "Index of {total} hashes completed, await {} seconds to continue...", + arg.sleep, + )); + std::thread::sleep(Duration::from_secs(arg.sleep)); + } +} + +fn save_torrent_file(s: &Storage, d: &Debug, i: &str, b: &[u8]) { + if s.torrent_exists(i) { + d.info(&format!("Torrent file `{i}` already exists, skip")) + } else { + match s.save_torrent(i, b) { + Ok(r) => d.info(&format!("Add torrent file `{}`", r.to_string_lossy())), + Err(e) => d.error(&e.to_string()), + } } } diff --git a/src/peers.rs b/src/peers.rs new file mode 100644 index 0000000..dadaee6 --- /dev/null +++ b/src/peers.rs @@ -0,0 +1,21 @@ +use std::{net::SocketAddr, str::FromStr}; + +pub struct Peers(Vec); + +impl Peers { + pub fn init(peers: &Vec) -> anyhow::Result { + let mut p = Vec::with_capacity(peers.len()); + for peer in peers { + p.push(SocketAddr::from_str(peer)?); + } + Ok(Self(p)) + } + + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } + + pub fn clone(&self) -> Vec { + self.0.clone() + } +} diff --git a/src/storage.rs b/src/storage.rs new file mode 100644 index 0000000..cc2c11d --- /dev/null +++ b/src/storage.rs @@ -0,0 +1,61 @@ +use anyhow::{Result, bail}; +use std::{fs, io::Write, path::PathBuf, str::FromStr}; + +pub struct Storage(PathBuf); + +impl Storage { + pub fn init(storage: &str, clear: bool) -> Result { + let p = PathBuf::from_str(storage)?; + if let Ok(t) = fs::metadata(&p) { + if t.is_file() { + bail!("Storage destination is not directory!"); + } + if t.is_dir() && clear { + for i in fs::read_dir(&p)? { + let r = i?.path(); + if r.is_dir() { + fs::remove_dir_all(&r)?; + } else { + fs::remove_file(&r)?; + } + } + } + } + fs::create_dir_all(&p)?; + Ok(Self(p)) + } + + pub fn torrent_exists(&self, infohash: &str) -> bool { + fs::metadata(self.torrent(infohash)) + .is_ok_and(|p| p.is_file() || p.is_dir() || p.is_symlink()) + } + + pub fn save_torrent(&self, infohash: &str, bytes: &[u8]) -> Result { + let p = self.torrent(infohash); + let mut f = fs::File::create(&p)?; + f.write_all(bytes)?; + Ok(p) + } + + pub fn output_folder(&self, infohash: &str) -> Result { + let mut p = PathBuf::new(); + p.push(&self.0); + p.push(infohash); + if p.is_file() { + bail!("File destination is not directory!"); + } + fs::create_dir_all(&p)?; + Ok(p.to_string_lossy().to_string()) + } + + pub fn path(&self) -> PathBuf { + self.0.clone() + } + + fn torrent(&self, infohash: &str) -> PathBuf { + let mut p = PathBuf::new(); + p.push(&self.0); + p.push(format!("{infohash}.torrent")); + p + } +} diff --git a/src/trackers.rs b/src/trackers.rs new file mode 100644 index 0000000..38d7707 --- /dev/null +++ b/src/trackers.rs @@ -0,0 +1,20 @@ +use std::{collections::HashSet, str::FromStr}; +use url::Url; + +pub struct Trackers(HashSet); + +impl Trackers { + pub fn init(trackers: &Vec) -> anyhow::Result { + let mut t = HashSet::with_capacity(trackers.len()); + for tracker in trackers { + t.insert(Url::from_str(tracker)?); + } + Ok(Self(t)) + } + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } + pub fn clone(&self) -> HashSet { + self.0.clone() + } +}