implement torrent resolve timeout with argument option

This commit is contained in:
yggverse 2025-06-15 04:10:13 +03:00
parent 307a935d7f
commit ce36c5dd87
3 changed files with 43 additions and 15 deletions

View file

@ -97,6 +97,11 @@ aquatic-crawler --infohash-file /path/to/info-hash-ipv4.json\
[default: 1000] [default: 1000]
-t <TIMEOUT>
Max time to handle one torrent
[default: 10]
-s <SLEEP> -s <SLEEP>
Crawl loop delay in seconds Crawl loop delay in seconds

View file

@ -70,6 +70,10 @@ pub struct Argument {
#[arg(long, default_value_t = 1000)] #[arg(long, default_value_t = 1000)]
pub index_capacity: usize, pub index_capacity: usize,
/// Max time to handle one torrent
#[arg(short, default_value_t = 10)]
pub timeout: u64,
/// Crawl loop delay in seconds /// Crawl loop delay in seconds
#[arg(short, default_value_t = 300)] #[arg(short, default_value_t = 300)]
pub sleep: u64, pub sleep: u64,

View file

@ -14,12 +14,14 @@ async fn main() -> Result<()> {
use clap::Parser; use clap::Parser;
use librqbit::{AddTorrent, AddTorrentOptions, AddTorrentResponse, SessionOptions}; use librqbit::{AddTorrent, AddTorrentOptions, AddTorrentResponse, SessionOptions};
use std::{collections::HashSet, num::NonZero, time::Duration}; use std::{collections::HashSet, num::NonZero, time::Duration};
use tokio::time;
// init components // init components
let arg = argument::Argument::parse(); let arg = argument::Argument::parse();
let debug = Debug::init(&arg.debug)?; let debug = Debug::init(&arg.debug)?;
let peers = peers::Peers::init(&arg.initial_peer)?; let peers = peers::Peers::init(&arg.initial_peer)?;
let storage = Storage::init(&arg.storage, arg.clear)?; let storage = Storage::init(&arg.storage, arg.clear)?;
let timeout = Duration::from_secs(arg.timeout);
let trackers = trackers::Trackers::init(&arg.torrent_tracker)?; let trackers = trackers::Trackers::init(&arg.torrent_tracker)?;
let session = librqbit::Session::new_with_opts( let session = librqbit::Session::new_with_opts(
storage.path(), storage.path(),
@ -57,8 +59,10 @@ async fn main() -> Result<()> {
continue; continue;
} }
debug.info(&format!("Index `{i}`...")); debug.info(&format!("Index `{i}`..."));
match session // run the crawler in single thread, use timeout to skip dead connections
.add_torrent( match time::timeout(
timeout,
session.add_torrent(
AddTorrent::from_url(format!("magnet:?xt=urn:btih:{i}")), AddTorrent::from_url(format!("magnet:?xt=urn:btih:{i}")),
Some(AddTorrentOptions { Some(AddTorrentOptions {
overwrite: true, overwrite: true,
@ -77,11 +81,12 @@ async fn main() -> Result<()> {
only_files_regex: arg.preload_regex.clone(), only_files_regex: arg.preload_regex.clone(),
..Default::default() ..Default::default()
}), }),
) ),
.await )
.await
{ {
Ok(r) => match r { Ok(r) => match r {
AddTorrentResponse::Added(id, mt) => { Ok(AddTorrentResponse::Added(id, mt)) => {
if arg.save_torrents { if arg.save_torrents {
mt.with_metadata(|m| { mt.with_metadata(|m| {
save_torrent_file( save_torrent_file(
@ -94,27 +99,41 @@ async fn main() -> Result<()> {
// use `r.info` for Memory, SQLite, Manticore and other alternative storage type // use `r.info` for Memory, SQLite, Manticore and other alternative storage type
})?; })?;
} }
mt.wait_until_completed().await?; match time::timeout(timeout, mt.wait_until_completed()).await {
session Ok(r) => {
.delete(librqbit::api::TorrentIdOrHash::Id(id), false) if let Err(e) = r {
.await?; debug.info(&format!("Skip `{i}`: `{e}`."))
index.insert(mt.info_hash().as_string()); } else {
session
.delete(
librqbit::api::TorrentIdOrHash::Id(id),
false,
)
.await?;
index.insert(mt.info_hash().as_string());
}
}
Err(e) => debug.info(&format!("Skip `{i}`: `{e}`.")),
}
} }
AddTorrentResponse::ListOnly(r) => { Ok(AddTorrentResponse::ListOnly(r)) => {
if arg.save_torrents { if arg.save_torrents {
save_torrent_file(&storage, &debug, &i, &r.torrent_bytes) save_torrent_file(&storage, &debug, &i, &r.torrent_bytes)
} }
// @TODO // @TODO
// use `r.info` for Memory, SQLite, Manticore and other alternative storage type // use `r.info` for Memory, SQLite,
// Manticore and other alternative storage type
index.insert(r.info_hash.as_string()); index.insert(r.info_hash.as_string());
} }
AddTorrentResponse::AlreadyManaged(..) => panic!(), // unexpected as should be deleted // unexpected as should be deleted
Ok(AddTorrentResponse::AlreadyManaged(..)) => panic!(),
Err(e) => debug.info(&format!("Skip `{i}`: `{e}`.")),
}, },
Err(e) => debug.info(&format!("Torrent handle skipped: `{e}`")), Err(e) => debug.info(&format!("Skip `{i}`: `{e}`.")),
} }
} }
} }
Err(e) => debug.error(&e.to_string()), Err(e) => debug.error(&format!("API issue for `{source}`: `{e}`")),
} }
} }
debug.info(&format!( debug.info(&format!(