diff --git a/README.md b/README.md index f3c9345..e821b76 100644 --- a/README.md +++ b/README.md @@ -82,7 +82,7 @@ aquatic-crawler --infohash-file /path/to/info-hash-ipv4.json\ ## Example: - Filter by image ext ``` --preload-regex '\.(png|gif|jpeg|webp)$' ``` + Filter by image ext ``` --preload-regex '(png|gif|jpeg|jpg|webp)$' ``` * requires `storage` argument defined @@ -92,6 +92,11 @@ aquatic-crawler --infohash-file /path/to/info-hash-ipv4.json\ --socks-proxy-url Use `socks5://[username:password@]host:port` +--index-capacity + Estimated info-hash index capacity + + [default: 1000] + -s Crawl loop delay in seconds diff --git a/src/argument.rs b/src/argument.rs index 41b24a6..c03b5bc 100644 --- a/src/argument.rs +++ b/src/argument.rs @@ -66,6 +66,10 @@ pub struct Argument { #[arg(long)] pub socks_proxy_url: Option, + /// Estimated info-hash index capacity + #[arg(long, default_value_t = 1000)] + pub index_capacity: usize, + /// Crawl loop delay in seconds #[arg(short, default_value_t = 300)] pub sleep: u64, diff --git a/src/main.rs b/src/main.rs index e996d15..7d014bc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,7 +13,7 @@ use storage::Storage; async fn main() -> Result<()> { use clap::Parser; use librqbit::{AddTorrent, AddTorrentOptions, AddTorrentResponse, SessionOptions}; - use std::{num::NonZero, time::Duration}; + use std::{collections::HashSet, num::NonZero, time::Duration}; // init components let arg = argument::Argument::parse(); @@ -41,18 +41,21 @@ async fn main() -> Result<()> { // begin debug.info("Crawler started"); + // collect processed info hashes to skip on the next cycles + let mut index = HashSet::with_capacity(arg.index_capacity); loop { debug.info("Index queue begin..."); - let mut total = 0; // collect info-hashes from each API channel for source in &arg.infohash_file { - debug.info(&format!("Handle info-hash source `{source}`...")); + debug.info(&format!("Index source `{source}`...")); // aquatic server may update the stats at this moment, // handle this state manually match api::infohashes(source) { Ok(infohashes) => { - total += infohashes.len(); for i in infohashes { + if index.contains(&i) { + continue; + } debug.info(&format!("Index `{i}`...")); match session .add_torrent( @@ -78,33 +81,24 @@ async fn main() -> Result<()> { .await { Ok(r) => match r { - AddTorrentResponse::AlreadyManaged(_, t) - | AddTorrentResponse::Added(_, t) => { + AddTorrentResponse::Added(id, mt) => { if arg.save_torrents { - t.with_metadata(|m| { + mt.with_metadata(|m| { save_torrent_file( &storage, &debug, &i, &m.torrent_bytes, ) + // @TODO + // use `r.info` for Memory, SQLite, Manticore and other alternative storage type })?; } - /*tokio::spawn({ - let t = t.clone(); - let d = Duration::from_secs(5); - async move { - loop { - let s = t.stats(); - if s.finished { - break; - } - debug.info(&format!("{s}...")); - tokio::time::sleep(d).await; - } - } - });*/ - // @TODO t.wait_until_completed().await?; + mt.wait_until_completed().await?; + session + .delete(librqbit::api::TorrentIdOrHash::Id(id), false) + .await?; + index.insert(mt.info_hash().as_string()); } AddTorrentResponse::ListOnly(r) => { if arg.save_torrents { @@ -112,7 +106,9 @@ async fn main() -> Result<()> { } // @TODO // use `r.info` for Memory, SQLite, Manticore and other alternative storage type + index.insert(r.info_hash.as_string()); } + AddTorrentResponse::AlreadyManaged(..) => panic!(), // unexpected as should be deleted }, Err(e) => debug.info(&format!("Torrent handle skipped: `{e}`")), } @@ -122,7 +118,8 @@ async fn main() -> Result<()> { } } debug.info(&format!( - "Index of {total} hashes completed, await {} seconds to continue...", + "Index completed, {} total, await {} seconds to continue...", + index.len(), arg.sleep, )); std::thread::sleep(Duration::from_secs(arg.sleep));