make api source response optional, implement tests

This commit is contained in:
yggverse 2025-07-07 19:22:11 +03:00
parent 6a3915a3f5
commit 15c8d8c350
4 changed files with 140 additions and 141 deletions

View file

@ -1,10 +1,17 @@
/// Parse infohash from the source filepath, /// Parse infohash from the source filepath,
/// decode JSON to array on success, return None if the feed is damaged (incomplete) /// decode JSON to array on success, return None if the feed file is not reachable
pub fn infohashes(path: &str) -> anyhow::Result<Option<Vec<String>>> { pub fn get(path: &str) -> Option<Vec<String>> {
if path.contains("://") { if path.contains("://") {
todo!("URL sources yet not supported") todo!("URL sources yet not supported")
} }
let s = std::fs::read_to_string(path)?; let s = std::fs::read_to_string(path).ok()?; // is updating?
let r: Option<Vec<String>> = serde_json::from_str(&s).ok(); let r: Option<Vec<String>> = serde_json::from_str(&s).ok(); // is incomplete?
Ok(r) r
}
#[test]
fn test() {
assert!(get("test/api/0.json").is_none());
assert!(get("test/api/1.json").is_some());
assert!(get("test/api/2.json").is_none());
} }

View file

@ -88,145 +88,136 @@ async fn main() -> Result<()> {
debug.info(&format!("Index source `{source}`...")); debug.info(&format!("Index source `{source}`..."));
// grab latest info-hashes from this source // grab latest info-hashes from this source
// * aquatic server may update the stats at this moment, handle result manually // * aquatic server may update the stats at this moment, handle result manually
match api::infohashes(source) { for i in match api::get(source) {
Ok(infohashes) => { Some(i) => i,
for i in match infohashes { None => {
Some(h) => h, // skip without panic
None => { debug.error(&format!(
// skip without panic "The feed `{source}` has an incomplete format (or is still updating); skip."
debug.error(&format!( ));
"The feed `{source}` has an incomplete format (or is still updating); skip." continue;
)); }
continue; } {
} // is already indexed?
} { if index.has(&i) {
// is already indexed? continue;
if index.has(&i) { }
continue; debug.info(&format!("Index `{i}`..."));
} // run the crawler in single thread for performance reasons,
debug.info(&format!("Index `{i}`...")); // use `timeout` argument option to skip the dead connections.
// run the crawler in single thread for performance reasons, match time::timeout(
// use `timeout` argument option to skip the dead connections. Duration::from_secs(config.add_torrent_timeout),
match time::timeout( session.add_torrent(
Duration::from_secs(config.add_torrent_timeout), AddTorrent::from_url(magnet(
session.add_torrent( &i,
AddTorrent::from_url(magnet( if config.export_trackers && !trackers.is_empty() {
&i, Some(trackers.list())
if config.export_trackers && !trackers.is_empty() { } else {
Some(trackers.list()) None
} else { },
None )),
}, Some(AddTorrentOptions {
)), paused: true, // continue after `only_files` init
Some(AddTorrentOptions { overwrite: true,
paused: true, // continue after `only_files` init disable_trackers: trackers.is_empty(),
overwrite: true, initial_peers: peers.initial_peers(),
disable_trackers: trackers.is_empty(), list_only: preload.as_ref().is_none_or(|p| p.regex.is_none()),
initial_peers: peers.initial_peers(), // it is important to blacklist all files preload until initiation
list_only: preload.as_ref().is_none_or(|p| p.regex.is_none()), only_files: Some(Vec::with_capacity(
// it is important to blacklist all files preload until initiation config.preload_max_filecount.unwrap_or_default(),
only_files: Some(Vec::with_capacity( )),
config.preload_max_filecount.unwrap_or_default(), // the destination folder to preload files match `only_files_regex`
)), // * e.g. images for audio albums
// the destination folder to preload files match `only_files_regex` output_folder: preload
// * e.g. images for audio albums .as_ref()
output_folder: preload .map(|p| p.output_folder(&i, true).unwrap()),
.as_ref() ..Default::default()
.map(|p| p.output_folder(&i, true).unwrap()), }),
..Default::default() ),
}), )
), .await
) {
.await Ok(r) => match r {
{ // on `preload_regex` case only
Ok(r) => match r { Ok(AddTorrentResponse::Added(id, mt)) => {
// on `preload_regex` case only let mut only_files_size = 0;
Ok(AddTorrentResponse::Added(id, mt)) => { let mut only_files_keep = Vec::with_capacity(
let mut only_files_size = 0; config.preload_max_filecount.unwrap_or_default(),
let mut only_files_keep = Vec::with_capacity( );
config.preload_max_filecount.unwrap_or_default(), let mut only_files = HashSet::with_capacity(
); config.preload_max_filecount.unwrap_or_default(),
let mut only_files = HashSet::with_capacity( );
config.preload_max_filecount.unwrap_or_default(), mt.wait_until_initialized().await?;
); let (name, length) = mt.with_metadata(|m| {
mt.wait_until_initialized().await?; // init preload files list
let (name, length) = mt.with_metadata(|m| { if let Some(ref p) = preload {
// init preload files list for (id, info) in m.file_infos.iter().enumerate() {
if let Some(ref p) = preload { if p.matches(info.relative_filename.to_str().unwrap()) {
for (id, info) in m.file_infos.iter().enumerate() { if p.max_filesize.is_some_and(|limit| {
if p.matches(info.relative_filename.to_str().unwrap()) { only_files_size + info.len > limit
if p.max_filesize.is_some_and( }) {
|limit| only_files_size + info.len > limit, debug.info(&format!(
) { "Total files size limit `{i}` reached!"
debug.info(&format!( ));
"Total files size limit `{i}` reached!" break;
)); }
break; if p.max_filecount
} .is_some_and(|limit| only_files.len() + 1 > limit)
if p.max_filecount.is_some_and( {
|limit| only_files.len() + 1 > limit, debug.info(&format!(
) { "Total files count limit for `{i}` reached!"
debug.info(&format!( ));
"Total files count limit for `{i}` reached!" break;
)); }
break; only_files_size += info.len;
} if let Some(ref p) = preload {
only_files_size += info.len; only_files_keep
if let Some(ref p) = preload { .push(p.absolute(&i, &info.relative_filename))
only_files_keep.push( }
p.absolute(&i, &info.relative_filename)) only_files.insert(id);
} }
only_files.insert(id); }
} }
} if let Some(ref t) = torrent {
} save_torrent_file(t, &debug, &i, &m.torrent_bytes)
if let Some(ref t) = torrent { }
save_torrent_file(t, &debug, &i, &m.torrent_bytes) (m.info.name.as_ref().map(|n| n.to_string()), m.info.length)
} })?;
(m.info.name.as_ref().map(|n|n.to_string()), m.info.length) session.update_only_files(&mt, &only_files).await?;
})?; session.unpause(&mt).await?;
session.update_only_files(&mt, &only_files).await?; // await for `preload_regex` files download to continue
session.unpause(&mt).await?; mt.wait_until_completed().await?;
// await for `preload_regex` files download to continue // remove torrent from session as indexed
mt.wait_until_completed().await?; session
// remove torrent from session as indexed .delete(librqbit::api::TorrentIdOrHash::Id(id), false)
session .await?;
.delete(librqbit::api::TorrentIdOrHash::Id(id), false) // cleanup irrelevant files (see rqbit#408)
.await?; if let Some(p) = &preload {
// cleanup irrelevant files (see rqbit#408) p.cleanup(&i, Some(only_files_keep))?
if let Some(p) = &preload { }
p.cleanup(&i, Some(only_files_keep))?
} index.insert(i, only_files_size, name, length)
}
index.insert(i, only_files_size, name, length) Ok(AddTorrentResponse::ListOnly(r)) => {
} if let Some(ref t) = torrent {
Ok(AddTorrentResponse::ListOnly(r)) => { save_torrent_file(t, &debug, &i, &r.torrent_bytes)
if let Some(ref t) = torrent { }
save_torrent_file(t, &debug, &i, &r.torrent_bytes)
} // @TODO
// use `r.info` for Memory, SQLite,
// @TODO // Manticore and other alternative storage type
// use `r.info` for Memory, SQLite,
// Manticore and other alternative storage type index.insert(i, 0, r.info.name.map(|n| n.to_string()), r.info.length)
}
index.insert( // unexpected as should be deleted
i, Ok(AddTorrentResponse::AlreadyManaged(..)) => panic!(),
0, Err(e) => debug.info(&format!("Skip `{i}`: `{e}`.")),
r.info.name.map(|n| n.to_string()), },
r.info.length, Err(e) => debug.info(&format!("Skip `{i}`: `{e}`.")),
)
}
// unexpected as should be deleted
Ok(AddTorrentResponse::AlreadyManaged(..)) => panic!(),
Err(e) => debug.info(&format!("Skip `{i}`: `{e}`.")),
},
Err(e) => debug.info(&format!("Skip `{i}`: `{e}`.")),
}
}
} }
Err(e) => debug.error(&format!("API issue for `{source}`: `{e}`")),
} }
} }
if let Some(ref export_rss) = config.export_rss if let Some(ref export_rss) = config.export_rss
&& index.is_changed() && index.is_changed()
{ {

0
test/api/0.json Normal file
View file

1
test/api/1.json Normal file
View file

@ -0,0 +1 @@
["1","2","3"]