implement option to export index collected as the static rss feed, set save_torrents as false by default

This commit is contained in:
yggverse 2025-07-06 22:51:51 +03:00
parent c206a06c25
commit 738fee1a88
6 changed files with 272 additions and 72 deletions

View file

@ -12,6 +12,7 @@ repository = "https://github.com/YGGverse/aquatic-crawler"
[dependencies] [dependencies]
anyhow = "1.0" anyhow = "1.0"
chrono = "0.4"
clap = { version = "4.5", features = ["derive"] } clap = { version = "4.5", features = ["derive"] }
hyper-util = "0.1" hyper-util = "0.1"
librqbit = {version = "9.0.0-beta.0", features = ["disable-upload"]} librqbit = {version = "9.0.0-beta.0", features = ["disable-upload"]}
@ -21,6 +22,7 @@ serde_json = "1.0"
tokio = { version = "1.45", features = ["full"] } tokio = { version = "1.45", features = ["full"] }
tracing-subscriber = "0.3" tracing-subscriber = "0.3"
url = "2.5" url = "2.5"
urlencoding = "2.1"
walkdir = "2.5" walkdir = "2.5"
[patch.crates-io] [patch.crates-io]
librqbit = { git = "https://github.com/ikatson/rqbit.git", package = "librqbit" } librqbit = { git = "https://github.com/ikatson/rqbit.git", package = "librqbit" }

View file

@ -25,6 +25,7 @@ Crawler for [Aquatic](https://github.com/greatest-ape/aquatic) BitTorrent tracke
* [x] File system (`--storage`) * [x] File system (`--storage`)
* [x] resolve infohash to the `.torrent` file (`--save-torrents`) * [x] resolve infohash to the `.torrent` file (`--save-torrents`)
* [x] download content files match the regex pattern (`--preload-regex`) * [x] download content files match the regex pattern (`--preload-regex`)
* [x] RSS feed export
* [ ] [Manticore](https://github.com/manticoresoftware/manticoresearch-rust) full text search * [ ] [Manticore](https://github.com/manticoresoftware/manticoresearch-rust) full text search
* [ ] SQLite * [ ] SQLite
@ -49,44 +50,61 @@ aquatic-crawler --infohash-file /path/to/info-hash-ipv4.json\
### Options ### Options
``` bash ``` bash
-d, --debug <DEBUG> -d, --debug <DEBUG>
Debug level Debug level
* `e` - error * `i` - info * `t` - trace (run with `RUST_LOG=librqbit=trace`) * `e` - error
* `i` - info
* `t` - trace (run with `RUST_LOG=librqbit=trace`)
[default: ei] [default: ei]
--clear --clear
Clear previous index collected on crawl session start Clear previous index collected on crawl session start
--infohash-file <INFOHASH_FILE> --infohash-file <INFOHASH_FILE>
Absolute filename(s) to the Aquatic tracker info-hash JSON/API Absolute filename(s) to the Aquatic tracker info-hash JSON/API
* PR#233 feature * PR#233 feature
--storage <STORAGE> --storage <STORAGE>
Directory path to store preloaded data (e.g. `.torrent` files) Directory path to store preloaded data (e.g. `.torrent` files)
--torrent-tracker <TORRENT_TRACKER> --torrent-tracker <TORRENT_TRACKER>
Define custom tracker(s) to preload the `.torrent` files info Define custom tracker(s) to preload the `.torrent` files info
--initial-peer <INITIAL_PEER> --initial-peer <INITIAL_PEER>
Define initial peer(s) to preload the `.torrent` files info Define initial peer(s) to preload the `.torrent` files info
--enable-dht --export-rss <EXPORT_RSS>
File path to export RSS feed
--export-rss-title <EXPORT_RSS_TITLE>
Custom title for RSS feed (channel)
[default: aquatic-crawler]
--export-rss-link <EXPORT_RSS_LINK>
Custom link for RSS feed (channel)
--export-rss-description <EXPORT_RSS_DESCRIPTION>
Custom description for RSS feed (channel)
--enable-dht
Enable DHT resolver Enable DHT resolver
--enable-tcp --enable-tcp
Enable TCP connection Enable TCP connection
--enable-upnp-port-forwarding --enable-upnp-port-forwarding
Enable UPnP Enable UPnP
--enable-upload --enable-upload
Enable upload Enable upload
--preload-regex <PRELOAD_REGEX> --preload-regex <PRELOAD_REGEX>
Preload only files match regex pattern (list only without preload by default) * see also `preload_max_filesize`, `preload_max_filecount` options Preload only files match regex pattern (list only without preload by default)
* see also `preload_max_filesize`, `preload_max_filecount` options
## Example: ## Example:
@ -94,54 +112,54 @@ aquatic-crawler --infohash-file /path/to/info-hash-ipv4.json\
* requires `storage` argument defined * requires `storage` argument defined
--preload-total-size <PRELOAD_TOTAL_SIZE> --preload-total-size <PRELOAD_TOTAL_SIZE>
Stop crawler on total preload files size reached Stop crawler on total preload files size reached
--preload-max-filesize <PRELOAD_MAX_FILESIZE> --preload-max-filesize <PRELOAD_MAX_FILESIZE>
Max size sum of preloaded files per torrent (match `preload_regex`) Max size sum of preloaded files per torrent (match `preload_regex`)
--preload-max-filecount <PRELOAD_MAX_FILECOUNT> --preload-max-filecount <PRELOAD_MAX_FILECOUNT>
Max count of preloaded files per torrent (match `preload_regex`) Max count of preloaded files per torrent (match `preload_regex`)
--save-torrents --save-torrents
Save resolved torrent files to the `storage` location Save resolved torrent files to the `storage` location
--proxy-url <PROXY_URL> --proxy-url <PROXY_URL>
Use `socks5://[username:password@]host:port` Use `socks5://[username:password@]host:port`
--peer-connect-timeout <PEER_CONNECT_TIMEOUT> --peer-connect-timeout <PEER_CONNECT_TIMEOUT>
--peer-read-write-timeout <PEER_READ_WRITE_TIMEOUT> --peer-read-write-timeout <PEER_READ_WRITE_TIMEOUT>
--peer-keep-alive-interval <PEER_KEEP_ALIVE_INTERVAL> --peer-keep-alive-interval <PEER_KEEP_ALIVE_INTERVAL>
--index-capacity <INDEX_CAPACITY> --index-capacity <INDEX_CAPACITY>
Estimated info-hash index capacity Estimated info-hash index capacity
[default: 1000] [default: 1000]
--add-torrent-timeout <ADD_TORRENT_TIMEOUT> --add-torrent-timeout <ADD_TORRENT_TIMEOUT>
Max time to handle each torrent Max time to handle each torrent
[default: 10] [default: 10]
--sleep <SLEEP> --sleep <SLEEP>
Crawl loop delay in seconds Crawl loop delay in seconds
[default: 300] [default: 300]
--upload-limit <UPLOAD_LIMIT> --upload-limit <UPLOAD_LIMIT>
Limit upload speed (b/s) Limit upload speed (b/s)
--download-limit <DOWNLOAD_LIMIT> --download-limit <DOWNLOAD_LIMIT>
Limit download speed (b/s) Limit download speed (b/s)
-h, --help -h, --help
Print help (see a summary with '-h') Print help (see a summary with '-h')
-V, --version -V, --version
Print version Print version
``` ```

View file

@ -33,6 +33,22 @@ pub struct Argument {
#[arg(long)] #[arg(long)]
pub initial_peer: Vec<String>, pub initial_peer: Vec<String>,
/// File path to export RSS feed
#[arg(long)]
pub export_rss: Option<String>,
/// Custom title for RSS feed (channel)
#[arg(long, default_value_t = String::from("aquatic-crawler"))]
pub export_rss_title: String,
/// Custom link for RSS feed (channel)
#[arg(long)]
pub export_rss_link: Option<String>,
/// Custom description for RSS feed (channel)
#[arg(long)]
pub export_rss_description: Option<String>,
/// Enable DHT resolver /// Enable DHT resolver
#[arg(long, default_value_t = false)] #[arg(long, default_value_t = false)]
pub enable_dht: bool, pub enable_dht: bool,
@ -76,7 +92,7 @@ pub struct Argument {
pub preload_max_filecount: Option<usize>, pub preload_max_filecount: Option<usize>,
/// Save resolved torrent files to the `storage` location /// Save resolved torrent files to the `storage` location
#[arg(long, default_value_t = true)] #[arg(long, default_value_t = false)]
pub save_torrents: bool, pub save_torrents: bool,
/// Use `socks5://[username:password@]host:port` /// Use `socks5://[username:password@]host:port`

7
src/index.rs Normal file
View file

@ -0,0 +1,7 @@
use chrono::{DateTime, Utc};
pub struct Index {
pub time: DateTime<Utc>,
pub node: u64,
pub name: Option<String>,
}

View file

@ -1,13 +1,24 @@
mod api; mod api;
mod argument; mod argument;
mod debug; mod debug;
mod index;
mod peers; mod peers;
mod rss;
mod storage; mod storage;
mod trackers; mod trackers;
use anyhow::Result; use anyhow::Result;
use chrono::Utc;
use debug::Debug; use debug::Debug;
use index::Index;
use rss::Rss;
use std::{
collections::{HashMap, HashSet},
num::NonZero,
time::Duration,
};
use storage::Storage; use storage::Storage;
use url::Url;
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {
@ -16,11 +27,6 @@ async fn main() -> Result<()> {
AddTorrent, AddTorrentOptions, AddTorrentResponse, ConnectionOptions, AddTorrent, AddTorrentOptions, AddTorrentResponse, ConnectionOptions,
PeerConnectionOptions, SessionOptions, PeerConnectionOptions, SessionOptions,
}; };
use std::{
collections::{HashMap, HashSet},
num::NonZero,
time::Duration,
};
use tokio::time; use tokio::time;
// init components // init components
@ -58,8 +64,8 @@ async fn main() -> Result<()> {
// begin // begin
debug.info("Crawler started"); debug.info("Crawler started");
// collect processed info hashes to skip on the next iterations (for this session) // collect processed info hashes to skip on the next iterations (for this session)
// * also contains optional meta info to export index as RSS or any other format
let mut index = HashMap::with_capacity(arg.index_capacity); let mut index = HashMap::with_capacity(arg.index_capacity);
loop { loop {
debug.info("Index queue begin..."); debug.info("Index queue begin...");
@ -80,7 +86,7 @@ async fn main() -> Result<()> {
match time::timeout( match time::timeout(
Duration::from_secs(arg.add_torrent_timeout), Duration::from_secs(arg.add_torrent_timeout),
session.add_torrent( session.add_torrent(
AddTorrent::from_url(format!("magnet:?xt=urn:btih:{i}")), AddTorrent::from_url(magnet(&i, None)),
Some(AddTorrentOptions { Some(AddTorrentOptions {
paused: true, // continue after `only_files` init paused: true, // continue after `only_files` init
overwrite: true, overwrite: true,
@ -111,7 +117,7 @@ async fn main() -> Result<()> {
arg.preload_max_filecount.unwrap_or_default(), arg.preload_max_filecount.unwrap_or_default(),
); );
mt.wait_until_initialized().await?; mt.wait_until_initialized().await?;
mt.with_metadata(|m| { let name = mt.with_metadata(|m| {
// init preload files list // init preload files list
if let Some(ref regex) = preload_regex { if let Some(ref regex) = preload_regex {
for (id, info) in m.file_infos.iter().enumerate() { for (id, info) in m.file_infos.iter().enumerate() {
@ -151,6 +157,7 @@ async fn main() -> Result<()> {
} }
// @TODO // @TODO
// use `r.info` for Memory, SQLite, Manticore and other alternative storage type // use `r.info` for Memory, SQLite, Manticore and other alternative storage type
m.info.name.as_ref().map(|n|n.to_string())
})?; })?;
session.update_only_files(&mt, &only_files).await?; session.update_only_files(&mt, &only_files).await?;
session.unpause(&mt).await?; session.unpause(&mt).await?;
@ -162,19 +169,33 @@ async fn main() -> Result<()> {
.await?; .await?;
// cleanup irrelevant files (see rqbit#408) // cleanup irrelevant files (see rqbit#408)
storage.cleanup(&i, Some(only_files_keep))?; storage.cleanup(&i, Some(only_files_keep))?;
// ignore on the next crawl iterations for this session
index.insert(i, only_files_size); index.insert(
i,
Index {
time: Utc::now(),
node: only_files_size,
name,
},
);
} }
Ok(AddTorrentResponse::ListOnly(r)) => { Ok(AddTorrentResponse::ListOnly(r)) => {
if arg.save_torrents { if arg.save_torrents {
save_torrent_file(&storage, &debug, &i, &r.torrent_bytes) save_torrent_file(&storage, &debug, &i, &r.torrent_bytes)
} }
// @TODO // @TODO
// use `r.info` for Memory, SQLite, // use `r.info` for Memory, SQLite,
// Manticore and other alternative storage type // Manticore and other alternative storage type
// ignore on the next crawl iterations for this session index.insert(
index.insert(i, 0); i,
Index {
time: Utc::now(),
node: 0,
name: r.info.name.map(|n| n.to_string()),
},
);
} }
// unexpected as should be deleted // unexpected as should be deleted
Ok(AddTorrentResponse::AlreadyManaged(..)) => panic!(), Ok(AddTorrentResponse::AlreadyManaged(..)) => panic!(),
@ -187,9 +208,27 @@ async fn main() -> Result<()> {
Err(e) => debug.error(&format!("API issue for `{source}`: `{e}`")), Err(e) => debug.error(&format!("API issue for `{source}`: `{e}`")),
} }
} }
if let Some(ref export_rss) = arg.export_rss {
let mut rss = Rss::new(
export_rss,
&arg.export_rss_title,
&arg.export_rss_link,
&arg.export_rss_description,
Some(trackers.clone()),
)?;
for (k, v) in &index {
rss.push(
k,
v.name.as_ref().unwrap_or(k),
None, // @TODO
Some(&v.time.to_rfc2822()),
)?
}
rss.commit()?
}
if arg if arg
.preload_total_size .preload_total_size
.is_some_and(|s| index.values().sum::<u64>() > s) .is_some_and(|s| index.values().map(|i| i.node).sum::<u64>() > s)
{ {
panic!("Preload content size {} bytes reached!", 0) panic!("Preload content size {} bytes reached!", 0)
} }
@ -212,3 +251,19 @@ fn save_torrent_file(s: &Storage, d: &Debug, i: &str, b: &[u8]) {
} }
} }
} }
/// Build magnet URI
fn magnet(infohash: &str, trackers: Option<&HashSet<Url>>) -> String {
let mut m = if infohash.len() == 40 {
format!("magnet:?xt=urn:btih:{infohash}")
} else {
todo!("infohash v2 is not supported by librqbit")
};
if let Some(t) = trackers {
for tracker in t {
m.push_str("&tr=");
m.push_str(&urlencoding::encode(tracker.as_str()))
}
}
m
}

102
src/rss.rs Normal file
View file

@ -0,0 +1,102 @@
use anyhow::{Result, bail};
use std::{collections::HashSet, fs::File, io::Write, path::PathBuf, str::FromStr};
use url::Url;
/// Export crawl index to the RSS file
pub struct Rss {
file: File,
target: PathBuf,
tmp: PathBuf,
trackers: Option<HashSet<Url>>,
}
impl Rss {
/// Create writable file for given `filepath`
pub fn new(
filepath: &str,
title: &str,
link: &Option<String>,
description: &Option<String>,
trackers: Option<HashSet<Url>>,
) -> Result<Self> {
// prevent from reading of the incomplete file
let tmp = PathBuf::from_str(&format!("{filepath}.tmp"))?;
// init public destination
let target = PathBuf::from_str(filepath)?;
if target.is_dir() {
bail!("RSS path `{}` is directory", target.to_string_lossy())
}
// init temporary file to write
let mut file = File::create(&tmp)?;
file.write_all(
b"<?xml version=\"1.0\" encoding=\"UTF-8\"?><rss version=\"2.0\"><channel><title>",
)?;
file.write_all(escape(title).as_bytes())?;
file.write_all(b"</title>")?;
if let Some(s) = link {
file.write_all(b"<link>")?;
file.write_all(escape(s).as_bytes())?;
file.write_all(b"</link>")?
}
if let Some(s) = description {
file.write_all(b"<description>")?;
file.write_all(escape(s).as_bytes())?;
file.write_all(b"</description>")?
}
Ok(Self {
file,
target,
trackers,
tmp,
})
}
/// Append `item` to the feed `channel`
pub fn push(
&mut self,
infohash: &str,
title: &str,
description: Option<&str>,
pub_date: Option<&str>,
) -> Result<()> {
self.file.write_all(
format!(
"<item><guid>{infohash}</guid><title>{}</title><link>{}</link>",
escape(title),
escape(&crate::magnet(infohash, self.trackers.as_ref()))
)
.as_bytes(),
)?;
if let Some(s) = description {
self.file.write_all(b"<description>")?;
self.file.write_all(escape(s).as_bytes())?;
self.file.write_all(b"</description>")?
}
if let Some(s) = pub_date {
self.file.write_all(b"<pubDate>")?;
self.file.write_all(escape(s).as_bytes())?;
self.file.write_all(b"</pubDate>")?
}
self.file.write_all(b"</item>")?;
Ok(())
}
/// Write final bytes, replace public file with temporary one
pub fn commit(mut self) -> Result<()> {
self.file.write_all(b"</channel></rss>")?;
std::fs::rename(self.tmp, self.target)?;
Ok(())
}
}
fn escape(subject: &str) -> String {
subject
.replace('&', "&amp;")
.replace('<', "&lt;")
.replace('>', "&gt;")
.replace('"', "&quot;")
.replace("'", "&apos;")
}