collect unique ids from the multiple source(s) to handle

This commit is contained in:
yggverse 2025-08-18 21:00:41 +03:00
parent 3643d456d0
commit 79643b84d2

View file

@ -66,6 +66,9 @@ async fn main() -> Result<()> {
loop { loop {
let time_queue = Local::now(); let time_queue = Local::now();
log::debug!("queue crawl begin at {time_queue}..."); log::debug!("queue crawl begin at {time_queue}...");
// build unique ID index from the multiple info-hash sources
let mut queue = HashSet::with_capacity(config.index_capacity);
for source in &config.infohash { for source in &config.infohash {
log::debug!("index source `{source}`..."); log::debug!("index source `{source}`...");
// grab latest info-hashes from this source // grab latest info-hashes from this source
@ -76,67 +79,69 @@ async fn main() -> Result<()> {
i i
} }
None => { None => {
// skip without panic
log::warn!( log::warn!(
"the feed `{source}` has an incomplete format (or is still updating); skip." "the feed `{source}` has an incomplete format (or is still updating); skip."
); );
continue; continue; // skip without panic
} }
} { } {
// convert to string once queue.insert(i);
let h = i.as_string(); }
if preload.contains_torrent(&h)? { }
log::debug!("torrent `{h}` exists, skip.");
continue; // handle
} for i in queue {
// skip banned entry, remove it from the ban list to retry on the next iteration // convert to string once
if ban.remove(&i) { let h = i.as_string();
log::debug!("torrent `{h}` is banned, skip for this queue."); if preload.contains_torrent(&h)? {
continue; log::debug!("torrent `{h}` exists, skip.");
} continue;
log::info!("resolve `{h}`..."); }
// run the crawler in single thread for performance reasons, // skip banned entry, remove it from the ban list to retry on the next iteration
// use `timeout` argument option to skip the dead connections. if ban.remove(&i) {
match time::timeout( log::debug!("torrent `{h}` is banned, skip for this queue.");
Duration::from_secs(config.timeout), continue;
session.add_torrent( }
AddTorrent::from_url(magnet( log::info!("resolve `{h}`...");
&h, // run the crawler in single thread for performance reasons,
if config.tracker.is_empty() { // use `timeout` argument option to skip the dead connections.
None match time::timeout(
} else { Duration::from_secs(config.timeout),
Some(config.tracker.as_ref()) session.add_torrent(
}, AddTorrent::from_url(magnet(
)), &h,
Some(AddTorrentOptions { if config.tracker.is_empty() {
paused: true, // continue after `only_files` update None
overwrite: true, } else {
disable_trackers: config.tracker.is_empty(), Some(config.tracker.as_ref())
initial_peers: config.initial_peer.clone(), },
list_only: false, )),
// the destination folder to preload files match `preload_regex` Some(AddTorrentOptions {
// * e.g. images for audio albums paused: true, // continue after `only_files` update
output_folder: preload overwrite: true,
.tmp_dir(&h, true)? disable_trackers: config.tracker.is_empty(),
.to_str() initial_peers: config.initial_peer.clone(),
.map(|s| s.to_string()), list_only: false,
..Default::default() // the destination folder to preload files match `preload_regex`
}), // * e.g. images for audio albums
), output_folder: preload.tmp_dir(&h, true)?.to_str().map(|s| s.to_string()),
) ..Default::default()
.await }),
{ ),
Ok(r) => match r { )
Ok(AddTorrentResponse::Added(id, mt)) => { .await
assert!(mt.is_paused()); {
let mut keep_files = HashSet::with_capacity( Ok(r) => match r {
config.preload_max_filecount.unwrap_or_default(), Ok(AddTorrentResponse::Added(id, mt)) => {
); assert!(mt.is_paused());
let mut only_files = HashSet::with_capacity( let mut keep_files = HashSet::with_capacity(
config.preload_max_filecount.unwrap_or_default(), config.preload_max_filecount.unwrap_or_default(),
); );
mt.wait_until_initialized().await?; let mut only_files = HashSet::with_capacity(
let bytes = mt.with_metadata(|m| { config.preload_max_filecount.unwrap_or_default(),
);
mt.wait_until_initialized().await?;
let bytes = mt.with_metadata(|m| {
for (id, info) in m.file_infos.iter().enumerate() { for (id, info) in m.file_infos.iter().enumerate() {
if preload if preload
.max_filecount .max_filecount
@ -173,51 +178,47 @@ async fn main() -> Result<()> {
} }
m.torrent_bytes.to_vec() m.torrent_bytes.to_vec()
})?; })?;
session.update_only_files(&mt, &only_files).await?; session.update_only_files(&mt, &only_files).await?;
session.unpause(&mt).await?; session.unpause(&mt).await?;
log::debug!("begin torrent `{h}` preload..."); log::debug!("begin torrent `{h}` preload...");
if let Err(e) = time::timeout( if let Err(e) = time::timeout(
Duration::from_secs(config.timeout), Duration::from_secs(config.timeout),
mt.wait_until_completed(), mt.wait_until_completed(),
) )
.await .await
{ {
log::info!( log::info!(
"skip awaiting the completion of preload torrent data for `{h}` (`{e}`), ban for the next queue.", "skip awaiting the completion of preload torrent data for `{h}` (`{e}`), ban for the next queue.",
);
assert!(ban.insert(i));
session
.delete(librqbit::api::TorrentIdOrHash::Id(id), false)
.await?; // * do not collect billions of slow torrents in the session pool
continue;
}
log::debug!("torrent `{h}` preload completed.");
// persist torrent bytes and preloaded content,
// cleanup tmp (see rqbit#408)
log::debug!(
"persist torrent `{h}` with `{}` files...",
keep_files.len()
); );
preload.commit(&h, bytes, Some(keep_files))?; assert!(ban.insert(i));
session session
.delete(librqbit::api::TorrentIdOrHash::Id(id), false) .delete(librqbit::api::TorrentIdOrHash::Id(id), false)
.await?; // torrent data was moved on commit; there's no sense in keeping it .await?; // * do not collect billions of slow torrents in the session pool
log::info!("torrent `{h}` resolved.") continue;
} }
Ok(_) => panic!(), log::debug!("torrent `{h}` preload completed.");
Err(e) => { // persist torrent bytes and preloaded content,
log::warn!( // cleanup tmp (see rqbit#408)
"failed to resolve torrent `{h}`: `{e}`, ban for the next queue." log::debug!("persist torrent `{h}` with `{}` files...", keep_files.len());
); preload.commit(&h, bytes, Some(keep_files))?;
assert!(ban.insert(i)) session
} .delete(librqbit::api::TorrentIdOrHash::Id(id), false)
}, .await?; // torrent data was moved on commit; there's no sense in keeping it
log::info!("torrent `{h}` resolved.")
}
Ok(_) => panic!(),
Err(e) => { Err(e) => {
log::info!( log::warn!(
"skip awaiting the completion of adding torrent `{h}` (`{e}`), ban for the next queue." "failed to resolve torrent `{h}`: `{e}`, ban for the next queue."
); );
assert!(ban.insert(i)) assert!(ban.insert(i))
} }
},
Err(e) => {
log::info!(
"skip awaiting the completion of adding torrent `{h}` (`{e}`), ban for the next queue."
);
assert!(ban.insert(i))
} }
} }
} }