diff --git a/README.md b/README.md index e821b76..0ef10e4 100644 --- a/README.md +++ b/README.md @@ -97,6 +97,11 @@ aquatic-crawler --infohash-file /path/to/info-hash-ipv4.json\ [default: 1000] +-t + Max time to handle one torrent + + [default: 10] + -s Crawl loop delay in seconds diff --git a/src/argument.rs b/src/argument.rs index c03b5bc..c6f42b3 100644 --- a/src/argument.rs +++ b/src/argument.rs @@ -70,6 +70,10 @@ pub struct Argument { #[arg(long, default_value_t = 1000)] pub index_capacity: usize, + /// Max time to handle one torrent + #[arg(short, default_value_t = 10)] + pub timeout: u64, + /// Crawl loop delay in seconds #[arg(short, default_value_t = 300)] pub sleep: u64, diff --git a/src/main.rs b/src/main.rs index 7d014bc..0b26383 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,12 +14,14 @@ async fn main() -> Result<()> { use clap::Parser; use librqbit::{AddTorrent, AddTorrentOptions, AddTorrentResponse, SessionOptions}; use std::{collections::HashSet, num::NonZero, time::Duration}; + use tokio::time; // init components let arg = argument::Argument::parse(); let debug = Debug::init(&arg.debug)?; let peers = peers::Peers::init(&arg.initial_peer)?; let storage = Storage::init(&arg.storage, arg.clear)?; + let timeout = Duration::from_secs(arg.timeout); let trackers = trackers::Trackers::init(&arg.torrent_tracker)?; let session = librqbit::Session::new_with_opts( storage.path(), @@ -57,8 +59,10 @@ async fn main() -> Result<()> { continue; } debug.info(&format!("Index `{i}`...")); - match session - .add_torrent( + // run the crawler in single thread, use timeout to skip dead connections + match time::timeout( + timeout, + session.add_torrent( AddTorrent::from_url(format!("magnet:?xt=urn:btih:{i}")), Some(AddTorrentOptions { overwrite: true, @@ -77,11 +81,12 @@ async fn main() -> Result<()> { only_files_regex: arg.preload_regex.clone(), ..Default::default() }), - ) - .await + ), + ) + .await { Ok(r) => match r { - AddTorrentResponse::Added(id, mt) => { + Ok(AddTorrentResponse::Added(id, mt)) => { if arg.save_torrents { mt.with_metadata(|m| { save_torrent_file( @@ -94,27 +99,41 @@ async fn main() -> Result<()> { // use `r.info` for Memory, SQLite, Manticore and other alternative storage type })?; } - mt.wait_until_completed().await?; - session - .delete(librqbit::api::TorrentIdOrHash::Id(id), false) - .await?; - index.insert(mt.info_hash().as_string()); + match time::timeout(timeout, mt.wait_until_completed()).await { + Ok(r) => { + if let Err(e) = r { + debug.info(&format!("Skip `{i}`: `{e}`.")) + } else { + session + .delete( + librqbit::api::TorrentIdOrHash::Id(id), + false, + ) + .await?; + index.insert(mt.info_hash().as_string()); + } + } + Err(e) => debug.info(&format!("Skip `{i}`: `{e}`.")), + } } - AddTorrentResponse::ListOnly(r) => { + Ok(AddTorrentResponse::ListOnly(r)) => { if arg.save_torrents { save_torrent_file(&storage, &debug, &i, &r.torrent_bytes) } // @TODO - // use `r.info` for Memory, SQLite, Manticore and other alternative storage type + // use `r.info` for Memory, SQLite, + // Manticore and other alternative storage type index.insert(r.info_hash.as_string()); } - AddTorrentResponse::AlreadyManaged(..) => panic!(), // unexpected as should be deleted + // unexpected as should be deleted + Ok(AddTorrentResponse::AlreadyManaged(..)) => panic!(), + Err(e) => debug.info(&format!("Skip `{i}`: `{e}`.")), }, - Err(e) => debug.info(&format!("Torrent handle skipped: `{e}`")), + Err(e) => debug.info(&format!("Skip `{i}`: `{e}`.")), } } } - Err(e) => debug.error(&e.to_string()), + Err(e) => debug.error(&format!("API issue for `{source}`: `{e}`")), } } debug.info(&format!(