implement processed torrents index, skip info-hashes processed, remove torrent from session on download complete

This commit is contained in:
yggverse 2025-06-15 01:59:20 +03:00
parent 9b6491a666
commit 307a935d7f
3 changed files with 30 additions and 24 deletions

View file

@ -82,7 +82,7 @@ aquatic-crawler --infohash-file /path/to/info-hash-ipv4.json\
## Example: ## Example:
Filter by image ext ``` --preload-regex '\.(png|gif|jpeg|webp)$' ``` Filter by image ext ``` --preload-regex '(png|gif|jpeg|jpg|webp)$' ```
* requires `storage` argument defined * requires `storage` argument defined
@ -92,6 +92,11 @@ aquatic-crawler --infohash-file /path/to/info-hash-ipv4.json\
--socks-proxy-url <SOCKS_PROXY_URL> --socks-proxy-url <SOCKS_PROXY_URL>
Use `socks5://[username:password@]host:port` Use `socks5://[username:password@]host:port`
--index-capacity <INDEX_CAPACITY>
Estimated info-hash index capacity
[default: 1000]
-s <SLEEP> -s <SLEEP>
Crawl loop delay in seconds Crawl loop delay in seconds

View file

@ -66,6 +66,10 @@ pub struct Argument {
#[arg(long)] #[arg(long)]
pub socks_proxy_url: Option<String>, pub socks_proxy_url: Option<String>,
/// Estimated info-hash index capacity
#[arg(long, default_value_t = 1000)]
pub index_capacity: usize,
/// Crawl loop delay in seconds /// Crawl loop delay in seconds
#[arg(short, default_value_t = 300)] #[arg(short, default_value_t = 300)]
pub sleep: u64, pub sleep: u64,

View file

@ -13,7 +13,7 @@ use storage::Storage;
async fn main() -> Result<()> { async fn main() -> Result<()> {
use clap::Parser; use clap::Parser;
use librqbit::{AddTorrent, AddTorrentOptions, AddTorrentResponse, SessionOptions}; use librqbit::{AddTorrent, AddTorrentOptions, AddTorrentResponse, SessionOptions};
use std::{num::NonZero, time::Duration}; use std::{collections::HashSet, num::NonZero, time::Duration};
// init components // init components
let arg = argument::Argument::parse(); let arg = argument::Argument::parse();
@ -41,18 +41,21 @@ async fn main() -> Result<()> {
// begin // begin
debug.info("Crawler started"); debug.info("Crawler started");
// collect processed info hashes to skip on the next cycles
let mut index = HashSet::with_capacity(arg.index_capacity);
loop { loop {
debug.info("Index queue begin..."); debug.info("Index queue begin...");
let mut total = 0;
// collect info-hashes from each API channel // collect info-hashes from each API channel
for source in &arg.infohash_file { for source in &arg.infohash_file {
debug.info(&format!("Handle info-hash source `{source}`...")); debug.info(&format!("Index source `{source}`..."));
// aquatic server may update the stats at this moment, // aquatic server may update the stats at this moment,
// handle this state manually // handle this state manually
match api::infohashes(source) { match api::infohashes(source) {
Ok(infohashes) => { Ok(infohashes) => {
total += infohashes.len();
for i in infohashes { for i in infohashes {
if index.contains(&i) {
continue;
}
debug.info(&format!("Index `{i}`...")); debug.info(&format!("Index `{i}`..."));
match session match session
.add_torrent( .add_torrent(
@ -78,33 +81,24 @@ async fn main() -> Result<()> {
.await .await
{ {
Ok(r) => match r { Ok(r) => match r {
AddTorrentResponse::AlreadyManaged(_, t) AddTorrentResponse::Added(id, mt) => {
| AddTorrentResponse::Added(_, t) => {
if arg.save_torrents { if arg.save_torrents {
t.with_metadata(|m| { mt.with_metadata(|m| {
save_torrent_file( save_torrent_file(
&storage, &storage,
&debug, &debug,
&i, &i,
&m.torrent_bytes, &m.torrent_bytes,
) )
// @TODO
// use `r.info` for Memory, SQLite, Manticore and other alternative storage type
})?; })?;
} }
/*tokio::spawn({ mt.wait_until_completed().await?;
let t = t.clone(); session
let d = Duration::from_secs(5); .delete(librqbit::api::TorrentIdOrHash::Id(id), false)
async move { .await?;
loop { index.insert(mt.info_hash().as_string());
let s = t.stats();
if s.finished {
break;
}
debug.info(&format!("{s}..."));
tokio::time::sleep(d).await;
}
}
});*/
// @TODO t.wait_until_completed().await?;
} }
AddTorrentResponse::ListOnly(r) => { AddTorrentResponse::ListOnly(r) => {
if arg.save_torrents { if arg.save_torrents {
@ -112,7 +106,9 @@ async fn main() -> Result<()> {
} }
// @TODO // @TODO
// use `r.info` for Memory, SQLite, Manticore and other alternative storage type // use `r.info` for Memory, SQLite, Manticore and other alternative storage type
index.insert(r.info_hash.as_string());
} }
AddTorrentResponse::AlreadyManaged(..) => panic!(), // unexpected as should be deleted
}, },
Err(e) => debug.info(&format!("Torrent handle skipped: `{e}`")), Err(e) => debug.info(&format!("Torrent handle skipped: `{e}`")),
} }
@ -122,7 +118,8 @@ async fn main() -> Result<()> {
} }
} }
debug.info(&format!( debug.info(&format!(
"Index of {total} hashes completed, await {} seconds to continue...", "Index completed, {} total, await {} seconds to continue...",
index.len(),
arg.sleep, arg.sleep,
)); ));
std::thread::sleep(Duration::from_secs(arg.sleep)); std::thread::sleep(Duration::from_secs(arg.sleep));