wrap index implementation, skip rss file update if the index is not changed (safes ssd life by prevent extra write operations)

This commit is contained in:
yggverse 2025-07-06 23:33:31 +03:00
parent ff7bb4c94f
commit b2b69ca9e7
2 changed files with 73 additions and 33 deletions

View file

@ -1,7 +1,68 @@
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use std::collections::HashMap;
pub struct Index { pub struct Value {
pub time: DateTime<Utc>, pub time: DateTime<Utc>,
pub node: u64, pub node: u64,
pub name: Option<String>, pub name: Option<String>,
} }
/// Collect processed info hashes to skip on the next iterations (for this session)
/// * also contains optional meta info to export index as RSS or any other format
pub struct Index {
index: HashMap<String, Value>,
/// Track index changes to prevent extra disk write operations (safe SSD life)
/// * useful in the static RSS feed generation case, if enabled.
is_changed: bool,
}
impl Index {
pub fn init(capacity: usize) -> Self {
Self {
index: HashMap::with_capacity(capacity),
is_changed: false,
}
}
pub fn has(&self, infohash: &str) -> bool {
self.index.contains_key(infohash)
}
pub fn is_changed(&self) -> bool {
self.is_changed
}
pub fn list(&self) -> &HashMap<String, Value> {
&self.index
}
pub fn len(&self) -> usize {
self.index.len()
}
pub fn nodes(&self) -> u64 {
self.index.values().map(|i| i.node).sum::<u64>()
}
pub fn insert(&mut self, infohash: String, node: u64, name: Option<String>) {
if self
.index
.insert(
infohash,
Value {
time: Utc::now(),
node,
name,
},
)
.is_none()
{
self.is_changed = true
}
}
pub fn refresh(&mut self) {
self.is_changed = false
// @TODO implement also index cleanup by Value timeout
}
}

View file

@ -8,15 +8,10 @@ mod storage;
mod trackers; mod trackers;
use anyhow::Result; use anyhow::Result;
use chrono::Utc;
use debug::Debug; use debug::Debug;
use index::Index; use index::Index;
use rss::Rss; use rss::Rss;
use std::{ use std::{collections::HashSet, num::NonZero, time::Duration};
collections::{HashMap, HashSet},
num::NonZero,
time::Duration,
};
use storage::Storage; use storage::Storage;
use url::Url; use url::Url;
@ -64,11 +59,10 @@ async fn main() -> Result<()> {
// begin // begin
debug.info("Crawler started"); debug.info("Crawler started");
// collect processed info hashes to skip on the next iterations (for this session) let mut index = Index::init(arg.index_capacity);
// * also contains optional meta info to export index as RSS or any other format
let mut index = HashMap::with_capacity(arg.index_capacity);
loop { loop {
debug.info("Index queue begin..."); debug.info("Index queue begin...");
index.refresh();
for source in &arg.infohash_file { for source in &arg.infohash_file {
debug.info(&format!("Index source `{source}`...")); debug.info(&format!("Index source `{source}`..."));
// grab latest info-hashes from this source // grab latest info-hashes from this source
@ -77,7 +71,7 @@ async fn main() -> Result<()> {
Ok(infohashes) => { Ok(infohashes) => {
for i in infohashes { for i in infohashes {
// is already indexed? // is already indexed?
if index.contains_key(&i) { if index.has(&i) {
continue; continue;
} }
debug.info(&format!("Index `{i}`...")); debug.info(&format!("Index `{i}`..."));
@ -170,14 +164,7 @@ async fn main() -> Result<()> {
// cleanup irrelevant files (see rqbit#408) // cleanup irrelevant files (see rqbit#408)
storage.cleanup(&i, Some(only_files_keep))?; storage.cleanup(&i, Some(only_files_keep))?;
index.insert( index.insert(i, only_files_size, name)
i,
Index {
time: Utc::now(),
node: only_files_size,
name,
},
);
} }
Ok(AddTorrentResponse::ListOnly(r)) => { Ok(AddTorrentResponse::ListOnly(r)) => {
if arg.save_torrents { if arg.save_torrents {
@ -188,14 +175,7 @@ async fn main() -> Result<()> {
// use `r.info` for Memory, SQLite, // use `r.info` for Memory, SQLite,
// Manticore and other alternative storage type // Manticore and other alternative storage type
index.insert( index.insert(i, 0, r.info.name.map(|n| n.to_string()))
i,
Index {
time: Utc::now(),
node: 0,
name: r.info.name.map(|n| n.to_string()),
},
);
} }
// unexpected as should be deleted // unexpected as should be deleted
Ok(AddTorrentResponse::AlreadyManaged(..)) => panic!(), Ok(AddTorrentResponse::AlreadyManaged(..)) => panic!(),
@ -208,7 +188,9 @@ async fn main() -> Result<()> {
Err(e) => debug.error(&format!("API issue for `{source}`: `{e}`")), Err(e) => debug.error(&format!("API issue for `{source}`: `{e}`")),
} }
} }
if let Some(ref export_rss) = arg.export_rss { if let Some(ref export_rss) = arg.export_rss
&& index.is_changed()
{
let mut rss = Rss::new( let mut rss = Rss::new(
export_rss, export_rss,
&arg.export_rss_title, &arg.export_rss_title,
@ -216,7 +198,7 @@ async fn main() -> Result<()> {
&arg.export_rss_description, &arg.export_rss_description,
Some(trackers.clone()), Some(trackers.clone()),
)?; )?;
for (k, v) in &index { for (k, v) in index.list() {
rss.push( rss.push(
k, k,
v.name.as_ref().unwrap_or(k), v.name.as_ref().unwrap_or(k),
@ -226,10 +208,7 @@ async fn main() -> Result<()> {
} }
rss.commit()? rss.commit()?
} }
if arg if arg.preload_total_size.is_some_and(|s| index.nodes() > s) {
.preload_total_size
.is_some_and(|s| index.values().map(|i| i.node).sum::<u64>() > s)
{
panic!("Preload content size {} bytes reached!", 0) panic!("Preload content size {} bytes reached!", 0)
} }
debug.info(&format!( debug.info(&format!(