implement tmp ban feature for the unresolvable info-hashes, make sleep timer optional, rename i to h (hash)

This commit is contained in:
yggverse 2025-08-13 15:16:45 +03:00
parent 8a9c928b98
commit d1e93638d9
2 changed files with 57 additions and 25 deletions

View file

@ -99,8 +99,12 @@ pub struct Config {
pub add_torrent_timeout: u64, pub add_torrent_timeout: u64,
/// Crawl loop delay in seconds /// Crawl loop delay in seconds
#[arg(long, default_value_t = 300)] #[arg(long)]
pub sleep: u64, pub sleep: Option<u64>,
/// Ban unresolvable info-hashes for `n` seconds
#[arg(long, default_value_t = 3600)]
pub ban: u64,
/// Limit download speed (b/s) /// Limit download speed (b/s)
#[arg(long)] #[arg(long)]

View file

@ -3,13 +3,18 @@ mod config;
mod preload; mod preload;
use anyhow::Result; use anyhow::Result;
use chrono::{DateTime, Utc};
use config::Config; use config::Config;
use librqbit::{ use librqbit::{
AddTorrent, AddTorrentOptions, AddTorrentResponse, ConnectionOptions, PeerConnectionOptions, AddTorrent, AddTorrentOptions, AddTorrentResponse, ConnectionOptions, PeerConnectionOptions,
SessionOptions, SessionOptions,
}; };
use preload::Preload; use preload::Preload;
use std::{collections::HashSet, num::NonZero, time::Duration}; use std::{
collections::{HashMap, HashSet},
num::NonZero,
time::Duration,
};
use url::Url; use url::Url;
#[tokio::main] #[tokio::main]
@ -59,10 +64,18 @@ async fn main() -> Result<()> {
}, },
) )
.await?; .await?;
let mut ban: HashMap<String, DateTime<Utc>> = HashMap::with_capacity(config.index_capacity);
log::info!("crawler started at {time_init}"); log::info!("crawler started at {time_init}");
loop { loop {
let time_queue = Local::now(); let time_queue = Local::now();
log::debug!("queue crawl begin at {time_queue}..."); log::debug!("queue crawl begin at {time_queue}...");
ban.retain(|k, &mut v| {
let is_expired = v > Utc::now() - Duration::from_secs(config.ban);
if is_expired {
log::debug!("remove `{k}` from the ban list (timeout expired)")
}
is_expired
});
for source in &config.infohash { for source in &config.infohash {
log::debug!("index source `{source}`..."); log::debug!("index source `{source}`...");
// grab latest info-hashes from this source // grab latest info-hashes from this source
@ -81,18 +94,23 @@ async fn main() -> Result<()> {
} }
} { } {
// convert to string once // convert to string once
let i = i.as_string(); let h = i.as_string();
if preload.contains_torrent(&i)? { if preload.contains_torrent(&h)? {
log::debug!("torrent `{h}` exists and already indexed, skip.");
continue; continue;
} }
log::debug!("index `{i}`..."); if ban.contains_key(&h) {
log::debug!("torrent `{h}` is temporary banned, skip for this queue.");
continue;
}
log::debug!("index `{h}`...");
// run the crawler in single thread for performance reasons, // run the crawler in single thread for performance reasons,
// use `timeout` argument option to skip the dead connections. // use `timeout` argument option to skip the dead connections.
match time::timeout( match time::timeout(
Duration::from_secs(config.add_torrent_timeout), Duration::from_secs(config.add_torrent_timeout),
session.add_torrent( session.add_torrent(
AddTorrent::from_url(magnet( AddTorrent::from_url(magnet(
&i, &h,
if config.tracker.is_empty() { if config.tracker.is_empty() {
None None
} else { } else {
@ -112,7 +130,7 @@ async fn main() -> Result<()> {
// the destination folder to preload files match `preload_regex` // the destination folder to preload files match `preload_regex`
// * e.g. images for audio albums // * e.g. images for audio albums
output_folder: preload output_folder: preload
.tmp_dir(&i, true)? .tmp_dir(&h, true)?
.to_str() .to_str()
.map(|s| s.to_string()), .map(|s| s.to_string()),
..Default::default() ..Default::default()
@ -137,7 +155,7 @@ async fn main() -> Result<()> {
.is_some_and(|limit| only_files.len() + 1 > limit) .is_some_and(|limit| only_files.len() + 1 > limit)
{ {
log::debug!( log::debug!(
"file count limit ({}) reached, skip file `{id}` for `{i}` at `{}` (and other files after it)", "file count limit ({}) reached, skip file `{id}` for `{h}` at `{}` (and other files after it)",
only_files.len(), only_files.len(),
info.relative_filename.to_string_lossy() info.relative_filename.to_string_lossy()
); );
@ -145,7 +163,7 @@ async fn main() -> Result<()> {
} }
if preload.max_filesize.is_some_and(|limit| info.len > limit) { if preload.max_filesize.is_some_and(|limit| info.len > limit) {
log::debug!( log::debug!(
"file size ({}) limit reached, skip file `{id}` for `{i}` at `{}`", "file size ({}) limit reached, skip file `{id}` for `{h}` at `{}`",
info.len, info.len,
info.relative_filename.to_string_lossy() info.relative_filename.to_string_lossy()
); );
@ -154,12 +172,12 @@ async fn main() -> Result<()> {
if preload.regex.as_ref().is_some_and(|r| { if preload.regex.as_ref().is_some_and(|r| {
!r.is_match(&info.relative_filename.to_string_lossy()) !r.is_match(&info.relative_filename.to_string_lossy())
}) { }) {
log::debug!("regex filter, skip file `{id}` for `{i}` at `{}`", log::debug!("regex filter, skip file `{id}` for `{h}` at `{}`",
info.relative_filename.to_string_lossy()); info.relative_filename.to_string_lossy());
continue; continue;
} }
log::debug!( log::debug!(
"keep file `{id}` for `{i}` as `{}`", "keep file `{id}` for `{h}` as `{}`",
info.relative_filename.to_string_lossy() info.relative_filename.to_string_lossy()
); );
assert!(keep_files.insert(info.relative_filename.clone())); assert!(keep_files.insert(info.relative_filename.clone()));
@ -169,7 +187,7 @@ async fn main() -> Result<()> {
})?; })?;
session.update_only_files(&mt, &only_files).await?; session.update_only_files(&mt, &only_files).await?;
session.unpause(&mt).await?; session.unpause(&mt).await?;
log::debug!("begin torrent `{i}` preload..."); log::debug!("begin torrent `{h}` preload...");
if let Err(e) = time::timeout( if let Err(e) = time::timeout(
Duration::from_secs(config.wait_until_completed), Duration::from_secs(config.wait_until_completed),
mt.wait_until_completed(), mt.wait_until_completed(),
@ -177,46 +195,56 @@ async fn main() -> Result<()> {
.await .await
{ {
log::debug!( log::debug!(
"skip awaiting the completion of preload `{i}` data (`{e}`)" "skip awaiting the completion of preload `{h}` data (`{e}`)"
); );
ban.insert(h, Utc::now());
session session
.delete(librqbit::api::TorrentIdOrHash::Id(id), false) .delete(librqbit::api::TorrentIdOrHash::Id(id), false)
.await?; // * do not collect billions of slow torrents in the session pool .await?; // * do not collect billions of slow torrents in the session pool
continue; continue;
} }
log::debug!("torrent `{i}` preload completed."); log::debug!("torrent `{h}` preload completed.");
// persist torrent bytes and preloaded content, // persist torrent bytes and preloaded content,
// cleanup tmp (see rqbit#408) // cleanup tmp (see rqbit#408)
log::debug!( log::debug!(
"persist torrent `{i}` with `{}` files...", "persist torrent `{h}` with `{}` files...",
keep_files.len() keep_files.len()
); );
preload.commit(&i, bytes, Some(keep_files))?; preload.commit(&h, bytes, Some(keep_files))?;
session session
.delete(librqbit::api::TorrentIdOrHash::Id(id), false) .delete(librqbit::api::TorrentIdOrHash::Id(id), false)
.await?; // torrent data was moved on commit; there's no sense in keeping it .await?; // torrent data was moved on commit; there's no sense in keeping it
log::debug!("torrent `{i}` resolved.") log::debug!("torrent `{h}` resolved.")
} }
Ok(_) => panic!(), Ok(_) => panic!(),
Err(e) => log::debug!("failed to resolve torrent `{i}`: `{e}`."), Err(e) => {
log::debug!("failed to resolve torrent `{h}`: `{e}`.");
assert!(ban.insert(h, Utc::now()).is_none());
}
}, },
Err(e) => log::debug!( Err(e) => {
"skip awaiting the completion of adding torrent `{i}` data (`{e}`)" log::debug!(
), "skip awaiting the completion of adding torrent `{h}` data (`{e}`)"
);
assert!(ban.insert(h, Utc::now()).is_none());
}
} }
} }
} }
log::debug!( log::debug!(
"queue completed at {time_queue} (time: {} / uptime: {}) await {} seconds to continue...", "queue completed at {time_queue} (time: {} / uptime: {} / banned: {})",
Local::now() Local::now()
.signed_duration_since(time_queue) .signed_duration_since(time_queue)
.as_seconds_f32(), .as_seconds_f32(),
Local::now() Local::now()
.signed_duration_since(time_init) .signed_duration_since(time_init)
.as_seconds_f32(), .as_seconds_f32(),
config.sleep, ban.len(),
); );
std::thread::sleep(Duration::from_secs(config.sleep)) if let Some(sleep) = config.sleep {
log::debug!("await {sleep} seconds to continue...");
std::thread::sleep(Duration::from_secs(sleep))
}
} }
} }