diff --git a/src/config.rs b/src/config.rs index 70050d9..5d14f49 100644 --- a/src/config.rs +++ b/src/config.rs @@ -99,8 +99,12 @@ pub struct Config { pub add_torrent_timeout: u64, /// Crawl loop delay in seconds - #[arg(long, default_value_t = 300)] - pub sleep: u64, + #[arg(long)] + pub sleep: Option, + + /// Ban unresolvable info-hashes for `n` seconds + #[arg(long, default_value_t = 3600)] + pub ban: u64, /// Limit download speed (b/s) #[arg(long)] diff --git a/src/main.rs b/src/main.rs index 5dfcfaf..1c80928 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,13 +3,18 @@ mod config; mod preload; use anyhow::Result; +use chrono::{DateTime, Utc}; use config::Config; use librqbit::{ AddTorrent, AddTorrentOptions, AddTorrentResponse, ConnectionOptions, PeerConnectionOptions, SessionOptions, }; use preload::Preload; -use std::{collections::HashSet, num::NonZero, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + num::NonZero, + time::Duration, +}; use url::Url; #[tokio::main] @@ -59,10 +64,18 @@ async fn main() -> Result<()> { }, ) .await?; + let mut ban: HashMap> = HashMap::with_capacity(config.index_capacity); log::info!("crawler started at {time_init}"); loop { let time_queue = Local::now(); log::debug!("queue crawl begin at {time_queue}..."); + ban.retain(|k, &mut v| { + let is_expired = v > Utc::now() - Duration::from_secs(config.ban); + if is_expired { + log::debug!("remove `{k}` from the ban list (timeout expired)") + } + is_expired + }); for source in &config.infohash { log::debug!("index source `{source}`..."); // grab latest info-hashes from this source @@ -81,18 +94,23 @@ async fn main() -> Result<()> { } } { // convert to string once - let i = i.as_string(); - if preload.contains_torrent(&i)? { + let h = i.as_string(); + if preload.contains_torrent(&h)? { + log::debug!("torrent `{h}` exists and already indexed, skip."); continue; } - log::debug!("index `{i}`..."); + if ban.contains_key(&h) { + log::debug!("torrent `{h}` is temporary banned, skip for this queue."); + continue; + } + log::debug!("index `{h}`..."); // run the crawler in single thread for performance reasons, // use `timeout` argument option to skip the dead connections. match time::timeout( Duration::from_secs(config.add_torrent_timeout), session.add_torrent( AddTorrent::from_url(magnet( - &i, + &h, if config.tracker.is_empty() { None } else { @@ -112,7 +130,7 @@ async fn main() -> Result<()> { // the destination folder to preload files match `preload_regex` // * e.g. images for audio albums output_folder: preload - .tmp_dir(&i, true)? + .tmp_dir(&h, true)? .to_str() .map(|s| s.to_string()), ..Default::default() @@ -137,7 +155,7 @@ async fn main() -> Result<()> { .is_some_and(|limit| only_files.len() + 1 > limit) { log::debug!( - "file count limit ({}) reached, skip file `{id}` for `{i}` at `{}` (and other files after it)", + "file count limit ({}) reached, skip file `{id}` for `{h}` at `{}` (and other files after it)", only_files.len(), info.relative_filename.to_string_lossy() ); @@ -145,7 +163,7 @@ async fn main() -> Result<()> { } if preload.max_filesize.is_some_and(|limit| info.len > limit) { log::debug!( - "file size ({}) limit reached, skip file `{id}` for `{i}` at `{}`", + "file size ({}) limit reached, skip file `{id}` for `{h}` at `{}`", info.len, info.relative_filename.to_string_lossy() ); @@ -154,12 +172,12 @@ async fn main() -> Result<()> { if preload.regex.as_ref().is_some_and(|r| { !r.is_match(&info.relative_filename.to_string_lossy()) }) { - log::debug!("regex filter, skip file `{id}` for `{i}` at `{}`", + log::debug!("regex filter, skip file `{id}` for `{h}` at `{}`", info.relative_filename.to_string_lossy()); continue; } log::debug!( - "keep file `{id}` for `{i}` as `{}`", + "keep file `{id}` for `{h}` as `{}`", info.relative_filename.to_string_lossy() ); assert!(keep_files.insert(info.relative_filename.clone())); @@ -169,7 +187,7 @@ async fn main() -> Result<()> { })?; session.update_only_files(&mt, &only_files).await?; session.unpause(&mt).await?; - log::debug!("begin torrent `{i}` preload..."); + log::debug!("begin torrent `{h}` preload..."); if let Err(e) = time::timeout( Duration::from_secs(config.wait_until_completed), mt.wait_until_completed(), @@ -177,46 +195,56 @@ async fn main() -> Result<()> { .await { log::debug!( - "skip awaiting the completion of preload `{i}` data (`{e}`)" + "skip awaiting the completion of preload `{h}` data (`{e}`)" ); + ban.insert(h, Utc::now()); session .delete(librqbit::api::TorrentIdOrHash::Id(id), false) .await?; // * do not collect billions of slow torrents in the session pool continue; } - log::debug!("torrent `{i}` preload completed."); + log::debug!("torrent `{h}` preload completed."); // persist torrent bytes and preloaded content, // cleanup tmp (see rqbit#408) log::debug!( - "persist torrent `{i}` with `{}` files...", + "persist torrent `{h}` with `{}` files...", keep_files.len() ); - preload.commit(&i, bytes, Some(keep_files))?; + preload.commit(&h, bytes, Some(keep_files))?; session .delete(librqbit::api::TorrentIdOrHash::Id(id), false) .await?; // torrent data was moved on commit; there's no sense in keeping it - log::debug!("torrent `{i}` resolved.") + log::debug!("torrent `{h}` resolved.") } Ok(_) => panic!(), - Err(e) => log::debug!("failed to resolve torrent `{i}`: `{e}`."), + Err(e) => { + log::debug!("failed to resolve torrent `{h}`: `{e}`."); + assert!(ban.insert(h, Utc::now()).is_none()); + } }, - Err(e) => log::debug!( - "skip awaiting the completion of adding torrent `{i}` data (`{e}`)" - ), + Err(e) => { + log::debug!( + "skip awaiting the completion of adding torrent `{h}` data (`{e}`)" + ); + assert!(ban.insert(h, Utc::now()).is_none()); + } } } } log::debug!( - "queue completed at {time_queue} (time: {} / uptime: {}) await {} seconds to continue...", + "queue completed at {time_queue} (time: {} / uptime: {} / banned: {})", Local::now() .signed_duration_since(time_queue) .as_seconds_f32(), Local::now() .signed_duration_since(time_init) .as_seconds_f32(), - config.sleep, + ban.len(), ); - std::thread::sleep(Duration::from_secs(config.sleep)) + if let Some(sleep) = config.sleep { + log::debug!("await {sleep} seconds to continue..."); + std::thread::sleep(Duration::from_secs(sleep)) + } } }