diff --git a/src/main.rs b/src/main.rs index 01c03a4..2002bcf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -66,6 +66,9 @@ async fn main() -> Result<()> { loop { let time_queue = Local::now(); log::debug!("queue crawl begin at {time_queue}..."); + + // build unique ID index from the multiple info-hash sources + let mut queue = HashSet::with_capacity(config.index_capacity); for source in &config.infohash { log::debug!("index source `{source}`..."); // grab latest info-hashes from this source @@ -76,67 +79,69 @@ async fn main() -> Result<()> { i } None => { - // skip without panic log::warn!( "the feed `{source}` has an incomplete format (or is still updating); skip." ); - continue; + continue; // skip without panic } } { - // convert to string once - let h = i.as_string(); - if preload.contains_torrent(&h)? { - log::debug!("torrent `{h}` exists, skip."); - continue; - } - // skip banned entry, remove it from the ban list to retry on the next iteration - if ban.remove(&i) { - log::debug!("torrent `{h}` is banned, skip for this queue."); - continue; - } - log::info!("resolve `{h}`..."); - // run the crawler in single thread for performance reasons, - // use `timeout` argument option to skip the dead connections. - match time::timeout( - Duration::from_secs(config.timeout), - session.add_torrent( - AddTorrent::from_url(magnet( - &h, - if config.tracker.is_empty() { - None - } else { - Some(config.tracker.as_ref()) - }, - )), - Some(AddTorrentOptions { - paused: true, // continue after `only_files` update - overwrite: true, - disable_trackers: config.tracker.is_empty(), - initial_peers: config.initial_peer.clone(), - list_only: false, - // the destination folder to preload files match `preload_regex` - // * e.g. images for audio albums - output_folder: preload - .tmp_dir(&h, true)? - .to_str() - .map(|s| s.to_string()), - ..Default::default() - }), - ), - ) - .await - { - Ok(r) => match r { - Ok(AddTorrentResponse::Added(id, mt)) => { - assert!(mt.is_paused()); - let mut keep_files = HashSet::with_capacity( - config.preload_max_filecount.unwrap_or_default(), - ); - let mut only_files = HashSet::with_capacity( - config.preload_max_filecount.unwrap_or_default(), - ); - mt.wait_until_initialized().await?; - let bytes = mt.with_metadata(|m| { + queue.insert(i); + } + } + + // handle + for i in queue { + // convert to string once + let h = i.as_string(); + if preload.contains_torrent(&h)? { + log::debug!("torrent `{h}` exists, skip."); + continue; + } + // skip banned entry, remove it from the ban list to retry on the next iteration + if ban.remove(&i) { + log::debug!("torrent `{h}` is banned, skip for this queue."); + continue; + } + log::info!("resolve `{h}`..."); + // run the crawler in single thread for performance reasons, + // use `timeout` argument option to skip the dead connections. + match time::timeout( + Duration::from_secs(config.timeout), + session.add_torrent( + AddTorrent::from_url(magnet( + &h, + if config.tracker.is_empty() { + None + } else { + Some(config.tracker.as_ref()) + }, + )), + Some(AddTorrentOptions { + paused: true, // continue after `only_files` update + overwrite: true, + disable_trackers: config.tracker.is_empty(), + initial_peers: config.initial_peer.clone(), + list_only: false, + // the destination folder to preload files match `preload_regex` + // * e.g. images for audio albums + output_folder: preload.tmp_dir(&h, true)?.to_str().map(|s| s.to_string()), + ..Default::default() + }), + ), + ) + .await + { + Ok(r) => match r { + Ok(AddTorrentResponse::Added(id, mt)) => { + assert!(mt.is_paused()); + let mut keep_files = HashSet::with_capacity( + config.preload_max_filecount.unwrap_or_default(), + ); + let mut only_files = HashSet::with_capacity( + config.preload_max_filecount.unwrap_or_default(), + ); + mt.wait_until_initialized().await?; + let bytes = mt.with_metadata(|m| { for (id, info) in m.file_infos.iter().enumerate() { if preload .max_filecount @@ -173,51 +178,47 @@ async fn main() -> Result<()> { } m.torrent_bytes.to_vec() })?; - session.update_only_files(&mt, &only_files).await?; - session.unpause(&mt).await?; - log::debug!("begin torrent `{h}` preload..."); - if let Err(e) = time::timeout( - Duration::from_secs(config.timeout), - mt.wait_until_completed(), - ) - .await - { - log::info!( - "skip awaiting the completion of preload torrent data for `{h}` (`{e}`), ban for the next queue.", - ); - assert!(ban.insert(i)); - session - .delete(librqbit::api::TorrentIdOrHash::Id(id), false) - .await?; // * do not collect billions of slow torrents in the session pool - continue; - } - log::debug!("torrent `{h}` preload completed."); - // persist torrent bytes and preloaded content, - // cleanup tmp (see rqbit#408) - log::debug!( - "persist torrent `{h}` with `{}` files...", - keep_files.len() + session.update_only_files(&mt, &only_files).await?; + session.unpause(&mt).await?; + log::debug!("begin torrent `{h}` preload..."); + if let Err(e) = time::timeout( + Duration::from_secs(config.timeout), + mt.wait_until_completed(), + ) + .await + { + log::info!( + "skip awaiting the completion of preload torrent data for `{h}` (`{e}`), ban for the next queue.", ); - preload.commit(&h, bytes, Some(keep_files))?; + assert!(ban.insert(i)); session .delete(librqbit::api::TorrentIdOrHash::Id(id), false) - .await?; // torrent data was moved on commit; there's no sense in keeping it - log::info!("torrent `{h}` resolved.") + .await?; // * do not collect billions of slow torrents in the session pool + continue; } - Ok(_) => panic!(), - Err(e) => { - log::warn!( - "failed to resolve torrent `{h}`: `{e}`, ban for the next queue." - ); - assert!(ban.insert(i)) - } - }, + log::debug!("torrent `{h}` preload completed."); + // persist torrent bytes and preloaded content, + // cleanup tmp (see rqbit#408) + log::debug!("persist torrent `{h}` with `{}` files...", keep_files.len()); + preload.commit(&h, bytes, Some(keep_files))?; + session + .delete(librqbit::api::TorrentIdOrHash::Id(id), false) + .await?; // torrent data was moved on commit; there's no sense in keeping it + log::info!("torrent `{h}` resolved.") + } + Ok(_) => panic!(), Err(e) => { - log::info!( - "skip awaiting the completion of adding torrent `{h}` (`{e}`), ban for the next queue." + log::warn!( + "failed to resolve torrent `{h}`: `{e}`, ban for the next queue." ); assert!(ban.insert(i)) } + }, + Err(e) => { + log::info!( + "skip awaiting the completion of adding torrent `{h}` (`{e}`), ban for the next queue." + ); + assert!(ban.insert(i)) } } }