diff --git a/Cargo.toml b/Cargo.toml index 14de7f6..0fd220d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ repository = "https://github.com/YGGverse/aquatic-crawler" [dependencies] anyhow = "1.0" +chrono = "0.4" clap = { version = "4.5", features = ["derive"] } hyper-util = "0.1" librqbit = {version = "9.0.0-beta.0", features = ["disable-upload"]} @@ -21,6 +22,7 @@ serde_json = "1.0" tokio = { version = "1.45", features = ["full"] } tracing-subscriber = "0.3" url = "2.5" +urlencoding = "2.1" walkdir = "2.5" [patch.crates-io] librqbit = { git = "https://github.com/ikatson/rqbit.git", package = "librqbit" } diff --git a/README.md b/README.md index 7c7914d..1dee792 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,7 @@ Crawler for [Aquatic](https://github.com/greatest-ape/aquatic) BitTorrent tracke * [x] File system (`--storage`) * [x] resolve infohash to the `.torrent` file (`--save-torrents`) * [x] download content files match the regex pattern (`--preload-regex`) + * [x] RSS feed export * [ ] [Manticore](https://github.com/manticoresoftware/manticoresearch-rust) full text search * [ ] SQLite @@ -49,99 +50,116 @@ aquatic-crawler --infohash-file /path/to/info-hash-ipv4.json\ ### Options ``` bash --d, --debug - Debug level + -d, --debug + Debug level - * `e` - error * `i` - info * `t` - trace (run with `RUST_LOG=librqbit=trace`) + * `e` - error + * `i` - info + * `t` - trace (run with `RUST_LOG=librqbit=trace`) - [default: ei] + [default: ei] ---clear - Clear previous index collected on crawl session start + --clear + Clear previous index collected on crawl session start ---infohash-file - Absolute filename(s) to the Aquatic tracker info-hash JSON/API + --infohash-file + Absolute filename(s) to the Aquatic tracker info-hash JSON/API - * PR#233 feature + * PR#233 feature ---storage - Directory path to store preloaded data (e.g. `.torrent` files) + --storage + Directory path to store preloaded data (e.g. `.torrent` files) ---torrent-tracker - Define custom tracker(s) to preload the `.torrent` files info + --torrent-tracker + Define custom tracker(s) to preload the `.torrent` files info ---initial-peer - Define initial peer(s) to preload the `.torrent` files info + --initial-peer + Define initial peer(s) to preload the `.torrent` files info ---enable-dht - Enable DHT resolver + --export-rss + File path to export RSS feed ---enable-tcp - Enable TCP connection + --export-rss-title + Custom title for RSS feed (channel) ---enable-upnp-port-forwarding - Enable UPnP + [default: aquatic-crawler] ---enable-upload - Enable upload + --export-rss-link + Custom link for RSS feed (channel) ---preload-regex - Preload only files match regex pattern (list only without preload by default) * see also `preload_max_filesize`, `preload_max_filecount` options + --export-rss-description + Custom description for RSS feed (channel) - ## Example: + --enable-dht + Enable DHT resolver - Filter by image ext ``` --preload-regex '(png|gif|jpeg|jpg|webp)$' ``` + --enable-tcp + Enable TCP connection - * requires `storage` argument defined + --enable-upnp-port-forwarding + Enable UPnP ---preload-total-size - Stop crawler on total preload files size reached + --enable-upload + Enable upload ---preload-max-filesize - Max size sum of preloaded files per torrent (match `preload_regex`) + --preload-regex + Preload only files match regex pattern (list only without preload by default) + * see also `preload_max_filesize`, `preload_max_filecount` options ---preload-max-filecount - Max count of preloaded files per torrent (match `preload_regex`) + ## Example: ---save-torrents - Save resolved torrent files to the `storage` location + Filter by image ext ``` --preload-regex '(png|gif|jpeg|jpg|webp)$' ``` ---proxy-url - Use `socks5://[username:password@]host:port` + * requires `storage` argument defined ---peer-connect-timeout + --preload-total-size + Stop crawler on total preload files size reached + + --preload-max-filesize + Max size sum of preloaded files per torrent (match `preload_regex`) + + --preload-max-filecount + Max count of preloaded files per torrent (match `preload_regex`) + + --save-torrents + Save resolved torrent files to the `storage` location + + --proxy-url + Use `socks5://[username:password@]host:port` + + --peer-connect-timeout ---peer-read-write-timeout + --peer-read-write-timeout ---peer-keep-alive-interval + --peer-keep-alive-interval ---index-capacity - Estimated info-hash index capacity + --index-capacity + Estimated info-hash index capacity - [default: 1000] + [default: 1000] ---add-torrent-timeout - Max time to handle each torrent + --add-torrent-timeout + Max time to handle each torrent - [default: 10] + [default: 10] ---sleep - Crawl loop delay in seconds + --sleep + Crawl loop delay in seconds - [default: 300] + [default: 300] ---upload-limit - Limit upload speed (b/s) + --upload-limit + Limit upload speed (b/s) ---download-limit - Limit download speed (b/s) + --download-limit + Limit download speed (b/s) --h, --help - Print help (see a summary with '-h') + -h, --help + Print help (see a summary with '-h') --V, --version - Print version + -V, --version + Print version ``` \ No newline at end of file diff --git a/src/argument.rs b/src/argument.rs index ec81af4..ed9ee28 100644 --- a/src/argument.rs +++ b/src/argument.rs @@ -33,6 +33,22 @@ pub struct Argument { #[arg(long)] pub initial_peer: Vec, + /// File path to export RSS feed + #[arg(long)] + pub export_rss: Option, + + /// Custom title for RSS feed (channel) + #[arg(long, default_value_t = String::from("aquatic-crawler"))] + pub export_rss_title: String, + + /// Custom link for RSS feed (channel) + #[arg(long)] + pub export_rss_link: Option, + + /// Custom description for RSS feed (channel) + #[arg(long)] + pub export_rss_description: Option, + /// Enable DHT resolver #[arg(long, default_value_t = false)] pub enable_dht: bool, @@ -76,7 +92,7 @@ pub struct Argument { pub preload_max_filecount: Option, /// Save resolved torrent files to the `storage` location - #[arg(long, default_value_t = true)] + #[arg(long, default_value_t = false)] pub save_torrents: bool, /// Use `socks5://[username:password@]host:port` diff --git a/src/index.rs b/src/index.rs new file mode 100644 index 0000000..cb2c94e --- /dev/null +++ b/src/index.rs @@ -0,0 +1,7 @@ +use chrono::{DateTime, Utc}; + +pub struct Index { + pub time: DateTime, + pub node: u64, + pub name: Option, +} diff --git a/src/main.rs b/src/main.rs index 85bef64..ee9e0c3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,13 +1,24 @@ mod api; mod argument; mod debug; +mod index; mod peers; +mod rss; mod storage; mod trackers; use anyhow::Result; +use chrono::Utc; use debug::Debug; +use index::Index; +use rss::Rss; +use std::{ + collections::{HashMap, HashSet}, + num::NonZero, + time::Duration, +}; use storage::Storage; +use url::Url; #[tokio::main] async fn main() -> Result<()> { @@ -16,11 +27,6 @@ async fn main() -> Result<()> { AddTorrent, AddTorrentOptions, AddTorrentResponse, ConnectionOptions, PeerConnectionOptions, SessionOptions, }; - use std::{ - collections::{HashMap, HashSet}, - num::NonZero, - time::Duration, - }; use tokio::time; // init components @@ -58,8 +64,8 @@ async fn main() -> Result<()> { // begin debug.info("Crawler started"); - // collect processed info hashes to skip on the next iterations (for this session) + // * also contains optional meta info to export index as RSS or any other format let mut index = HashMap::with_capacity(arg.index_capacity); loop { debug.info("Index queue begin..."); @@ -80,7 +86,7 @@ async fn main() -> Result<()> { match time::timeout( Duration::from_secs(arg.add_torrent_timeout), session.add_torrent( - AddTorrent::from_url(format!("magnet:?xt=urn:btih:{i}")), + AddTorrent::from_url(magnet(&i, None)), Some(AddTorrentOptions { paused: true, // continue after `only_files` init overwrite: true, @@ -111,7 +117,7 @@ async fn main() -> Result<()> { arg.preload_max_filecount.unwrap_or_default(), ); mt.wait_until_initialized().await?; - mt.with_metadata(|m| { + let name = mt.with_metadata(|m| { // init preload files list if let Some(ref regex) = preload_regex { for (id, info) in m.file_infos.iter().enumerate() { @@ -151,6 +157,7 @@ async fn main() -> Result<()> { } // @TODO // use `r.info` for Memory, SQLite, Manticore and other alternative storage type + m.info.name.as_ref().map(|n|n.to_string()) })?; session.update_only_files(&mt, &only_files).await?; session.unpause(&mt).await?; @@ -162,19 +169,33 @@ async fn main() -> Result<()> { .await?; // cleanup irrelevant files (see rqbit#408) storage.cleanup(&i, Some(only_files_keep))?; - // ignore on the next crawl iterations for this session - index.insert(i, only_files_size); + + index.insert( + i, + Index { + time: Utc::now(), + node: only_files_size, + name, + }, + ); } Ok(AddTorrentResponse::ListOnly(r)) => { if arg.save_torrents { save_torrent_file(&storage, &debug, &i, &r.torrent_bytes) } + // @TODO // use `r.info` for Memory, SQLite, // Manticore and other alternative storage type - // ignore on the next crawl iterations for this session - index.insert(i, 0); + index.insert( + i, + Index { + time: Utc::now(), + node: 0, + name: r.info.name.map(|n| n.to_string()), + }, + ); } // unexpected as should be deleted Ok(AddTorrentResponse::AlreadyManaged(..)) => panic!(), @@ -187,9 +208,27 @@ async fn main() -> Result<()> { Err(e) => debug.error(&format!("API issue for `{source}`: `{e}`")), } } + if let Some(ref export_rss) = arg.export_rss { + let mut rss = Rss::new( + export_rss, + &arg.export_rss_title, + &arg.export_rss_link, + &arg.export_rss_description, + Some(trackers.clone()), + )?; + for (k, v) in &index { + rss.push( + k, + v.name.as_ref().unwrap_or(k), + None, // @TODO + Some(&v.time.to_rfc2822()), + )? + } + rss.commit()? + } if arg .preload_total_size - .is_some_and(|s| index.values().sum::() > s) + .is_some_and(|s| index.values().map(|i| i.node).sum::() > s) { panic!("Preload content size {} bytes reached!", 0) } @@ -212,3 +251,19 @@ fn save_torrent_file(s: &Storage, d: &Debug, i: &str, b: &[u8]) { } } } + +/// Build magnet URI +fn magnet(infohash: &str, trackers: Option<&HashSet>) -> String { + let mut m = if infohash.len() == 40 { + format!("magnet:?xt=urn:btih:{infohash}") + } else { + todo!("infohash v2 is not supported by librqbit") + }; + if let Some(t) = trackers { + for tracker in t { + m.push_str("&tr="); + m.push_str(&urlencoding::encode(tracker.as_str())) + } + } + m +} diff --git a/src/rss.rs b/src/rss.rs new file mode 100644 index 0000000..ef060ed --- /dev/null +++ b/src/rss.rs @@ -0,0 +1,102 @@ +use anyhow::{Result, bail}; +use std::{collections::HashSet, fs::File, io::Write, path::PathBuf, str::FromStr}; +use url::Url; + +/// Export crawl index to the RSS file +pub struct Rss { + file: File, + target: PathBuf, + tmp: PathBuf, + trackers: Option>, +} + +impl Rss { + /// Create writable file for given `filepath` + pub fn new( + filepath: &str, + title: &str, + link: &Option, + description: &Option, + trackers: Option>, + ) -> Result { + // prevent from reading of the incomplete file + let tmp = PathBuf::from_str(&format!("{filepath}.tmp"))?; + + // init public destination + let target = PathBuf::from_str(filepath)?; + if target.is_dir() { + bail!("RSS path `{}` is directory", target.to_string_lossy()) + } + // init temporary file to write + let mut file = File::create(&tmp)?; + file.write_all( + b"", + )?; + file.write_all(escape(title).as_bytes())?; + file.write_all(b"")?; + + if let Some(s) = link { + file.write_all(b"")?; + file.write_all(escape(s).as_bytes())?; + file.write_all(b"")? + } + if let Some(s) = description { + file.write_all(b"")?; + file.write_all(escape(s).as_bytes())?; + file.write_all(b"")? + } + + Ok(Self { + file, + target, + trackers, + tmp, + }) + } + + /// Append `item` to the feed `channel` + pub fn push( + &mut self, + infohash: &str, + title: &str, + description: Option<&str>, + pub_date: Option<&str>, + ) -> Result<()> { + self.file.write_all( + format!( + "{infohash}{}{}", + escape(title), + escape(&crate::magnet(infohash, self.trackers.as_ref())) + ) + .as_bytes(), + )?; + if let Some(s) = description { + self.file.write_all(b"")?; + self.file.write_all(escape(s).as_bytes())?; + self.file.write_all(b"")? + } + if let Some(s) = pub_date { + self.file.write_all(b"")?; + self.file.write_all(escape(s).as_bytes())?; + self.file.write_all(b"")? + } + self.file.write_all(b"")?; + Ok(()) + } + + /// Write final bytes, replace public file with temporary one + pub fn commit(mut self) -> Result<()> { + self.file.write_all(b"")?; + std::fs::rename(self.tmp, self.target)?; + Ok(()) + } +} + +fn escape(subject: &str) -> String { + subject + .replace('&', "&") + .replace('<', "<") + .replace('>', ">") + .replace('"', """) + .replace("'", "'") +}