From 15c8d8c35034eca53026af1eca3f05c1b28421b7 Mon Sep 17 00:00:00 2001 From: yggverse Date: Mon, 7 Jul 2025 19:22:11 +0300 Subject: [PATCH] make api source response optional, implement tests --- src/api.rs | 17 +++- src/main.rs | 263 +++++++++++++++++++++++------------------------- test/api/0.json | 0 test/api/1.json | 1 + 4 files changed, 140 insertions(+), 141 deletions(-) create mode 100644 test/api/0.json create mode 100644 test/api/1.json diff --git a/src/api.rs b/src/api.rs index 595974f..849c765 100644 --- a/src/api.rs +++ b/src/api.rs @@ -1,10 +1,17 @@ /// Parse infohash from the source filepath, -/// decode JSON to array on success, return None if the feed is damaged (incomplete) -pub fn infohashes(path: &str) -> anyhow::Result>> { +/// decode JSON to array on success, return None if the feed file is not reachable +pub fn get(path: &str) -> Option> { if path.contains("://") { todo!("URL sources yet not supported") } - let s = std::fs::read_to_string(path)?; - let r: Option> = serde_json::from_str(&s).ok(); - Ok(r) + let s = std::fs::read_to_string(path).ok()?; // is updating? + let r: Option> = serde_json::from_str(&s).ok(); // is incomplete? + r +} + +#[test] +fn test() { + assert!(get("test/api/0.json").is_none()); + assert!(get("test/api/1.json").is_some()); + assert!(get("test/api/2.json").is_none()); } diff --git a/src/main.rs b/src/main.rs index d9f5fd4..65edb4b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -88,145 +88,136 @@ async fn main() -> Result<()> { debug.info(&format!("Index source `{source}`...")); // grab latest info-hashes from this source // * aquatic server may update the stats at this moment, handle result manually - match api::infohashes(source) { - Ok(infohashes) => { - for i in match infohashes { - Some(h) => h, - None => { - // skip without panic - debug.error(&format!( - "The feed `{source}` has an incomplete format (or is still updating); skip." - )); - continue; - } - } { - // is already indexed? - if index.has(&i) { - continue; - } - debug.info(&format!("Index `{i}`...")); - // run the crawler in single thread for performance reasons, - // use `timeout` argument option to skip the dead connections. - match time::timeout( - Duration::from_secs(config.add_torrent_timeout), - session.add_torrent( - AddTorrent::from_url(magnet( - &i, - if config.export_trackers && !trackers.is_empty() { - Some(trackers.list()) - } else { - None - }, - )), - Some(AddTorrentOptions { - paused: true, // continue after `only_files` init - overwrite: true, - disable_trackers: trackers.is_empty(), - initial_peers: peers.initial_peers(), - list_only: preload.as_ref().is_none_or(|p| p.regex.is_none()), - // it is important to blacklist all files preload until initiation - only_files: Some(Vec::with_capacity( - config.preload_max_filecount.unwrap_or_default(), - )), - // the destination folder to preload files match `only_files_regex` - // * e.g. images for audio albums - output_folder: preload - .as_ref() - .map(|p| p.output_folder(&i, true).unwrap()), - ..Default::default() - }), - ), - ) - .await - { - Ok(r) => match r { - // on `preload_regex` case only - Ok(AddTorrentResponse::Added(id, mt)) => { - let mut only_files_size = 0; - let mut only_files_keep = Vec::with_capacity( - config.preload_max_filecount.unwrap_or_default(), - ); - let mut only_files = HashSet::with_capacity( - config.preload_max_filecount.unwrap_or_default(), - ); - mt.wait_until_initialized().await?; - let (name, length) = mt.with_metadata(|m| { - // init preload files list - if let Some(ref p) = preload { - for (id, info) in m.file_infos.iter().enumerate() { - if p.matches(info.relative_filename.to_str().unwrap()) { - if p.max_filesize.is_some_and( - |limit| only_files_size + info.len > limit, - ) { - debug.info(&format!( - "Total files size limit `{i}` reached!" - )); - break; - } - if p.max_filecount.is_some_and( - |limit| only_files.len() + 1 > limit, - ) { - debug.info(&format!( - "Total files count limit for `{i}` reached!" - )); - break; - } - only_files_size += info.len; - if let Some(ref p) = preload { - only_files_keep.push( - p.absolute(&i, &info.relative_filename)) - } - only_files.insert(id); - } - } - } - if let Some(ref t) = torrent { - save_torrent_file(t, &debug, &i, &m.torrent_bytes) - } - (m.info.name.as_ref().map(|n|n.to_string()), m.info.length) - })?; - session.update_only_files(&mt, &only_files).await?; - session.unpause(&mt).await?; - // await for `preload_regex` files download to continue - mt.wait_until_completed().await?; - // remove torrent from session as indexed - session - .delete(librqbit::api::TorrentIdOrHash::Id(id), false) - .await?; - // cleanup irrelevant files (see rqbit#408) - if let Some(p) = &preload { - p.cleanup(&i, Some(only_files_keep))? - } - - index.insert(i, only_files_size, name, length) - } - Ok(AddTorrentResponse::ListOnly(r)) => { - if let Some(ref t) = torrent { - save_torrent_file(t, &debug, &i, &r.torrent_bytes) - } - - // @TODO - // use `r.info` for Memory, SQLite, - // Manticore and other alternative storage type - - index.insert( - i, - 0, - r.info.name.map(|n| n.to_string()), - r.info.length, - ) - } - // unexpected as should be deleted - Ok(AddTorrentResponse::AlreadyManaged(..)) => panic!(), - Err(e) => debug.info(&format!("Skip `{i}`: `{e}`.")), - }, - Err(e) => debug.info(&format!("Skip `{i}`: `{e}`.")), - } - } + for i in match api::get(source) { + Some(i) => i, + None => { + // skip without panic + debug.error(&format!( + "The feed `{source}` has an incomplete format (or is still updating); skip." + )); + continue; + } + } { + // is already indexed? + if index.has(&i) { + continue; + } + debug.info(&format!("Index `{i}`...")); + // run the crawler in single thread for performance reasons, + // use `timeout` argument option to skip the dead connections. + match time::timeout( + Duration::from_secs(config.add_torrent_timeout), + session.add_torrent( + AddTorrent::from_url(magnet( + &i, + if config.export_trackers && !trackers.is_empty() { + Some(trackers.list()) + } else { + None + }, + )), + Some(AddTorrentOptions { + paused: true, // continue after `only_files` init + overwrite: true, + disable_trackers: trackers.is_empty(), + initial_peers: peers.initial_peers(), + list_only: preload.as_ref().is_none_or(|p| p.regex.is_none()), + // it is important to blacklist all files preload until initiation + only_files: Some(Vec::with_capacity( + config.preload_max_filecount.unwrap_or_default(), + )), + // the destination folder to preload files match `only_files_regex` + // * e.g. images for audio albums + output_folder: preload + .as_ref() + .map(|p| p.output_folder(&i, true).unwrap()), + ..Default::default() + }), + ), + ) + .await + { + Ok(r) => match r { + // on `preload_regex` case only + Ok(AddTorrentResponse::Added(id, mt)) => { + let mut only_files_size = 0; + let mut only_files_keep = Vec::with_capacity( + config.preload_max_filecount.unwrap_or_default(), + ); + let mut only_files = HashSet::with_capacity( + config.preload_max_filecount.unwrap_or_default(), + ); + mt.wait_until_initialized().await?; + let (name, length) = mt.with_metadata(|m| { + // init preload files list + if let Some(ref p) = preload { + for (id, info) in m.file_infos.iter().enumerate() { + if p.matches(info.relative_filename.to_str().unwrap()) { + if p.max_filesize.is_some_and(|limit| { + only_files_size + info.len > limit + }) { + debug.info(&format!( + "Total files size limit `{i}` reached!" + )); + break; + } + if p.max_filecount + .is_some_and(|limit| only_files.len() + 1 > limit) + { + debug.info(&format!( + "Total files count limit for `{i}` reached!" + )); + break; + } + only_files_size += info.len; + if let Some(ref p) = preload { + only_files_keep + .push(p.absolute(&i, &info.relative_filename)) + } + only_files.insert(id); + } + } + } + if let Some(ref t) = torrent { + save_torrent_file(t, &debug, &i, &m.torrent_bytes) + } + (m.info.name.as_ref().map(|n| n.to_string()), m.info.length) + })?; + session.update_only_files(&mt, &only_files).await?; + session.unpause(&mt).await?; + // await for `preload_regex` files download to continue + mt.wait_until_completed().await?; + // remove torrent from session as indexed + session + .delete(librqbit::api::TorrentIdOrHash::Id(id), false) + .await?; + // cleanup irrelevant files (see rqbit#408) + if let Some(p) = &preload { + p.cleanup(&i, Some(only_files_keep))? + } + + index.insert(i, only_files_size, name, length) + } + Ok(AddTorrentResponse::ListOnly(r)) => { + if let Some(ref t) = torrent { + save_torrent_file(t, &debug, &i, &r.torrent_bytes) + } + + // @TODO + // use `r.info` for Memory, SQLite, + // Manticore and other alternative storage type + + index.insert(i, 0, r.info.name.map(|n| n.to_string()), r.info.length) + } + // unexpected as should be deleted + Ok(AddTorrentResponse::AlreadyManaged(..)) => panic!(), + Err(e) => debug.info(&format!("Skip `{i}`: `{e}`.")), + }, + Err(e) => debug.info(&format!("Skip `{i}`: `{e}`.")), } - Err(e) => debug.error(&format!("API issue for `{source}`: `{e}`")), } } + if let Some(ref export_rss) = config.export_rss && index.is_changed() { diff --git a/test/api/0.json b/test/api/0.json new file mode 100644 index 0000000..e69de29 diff --git a/test/api/1.json b/test/api/1.json new file mode 100644 index 0000000..181a695 --- /dev/null +++ b/test/api/1.json @@ -0,0 +1 @@ +["1","2","3"] \ No newline at end of file