implement permanent database to skip handle processed entries

This commit is contained in:
yggverse 2026-04-06 22:35:36 +03:00
parent c160c30ee5
commit 2ea58cc9c3
7 changed files with 99 additions and 13 deletions

1
.gitignore vendored
View file

@ -1,2 +1,3 @@
*.json
*.redb
/target

12
Cargo.lock generated
View file

@ -1247,6 +1247,15 @@ dependencies = [
"getrandom 0.3.4",
]
[[package]]
name = "redb"
version = "4.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67f7f231ea7b1172b7ac00ccf96b1250f0fb5a16d5585836aa4ebc997df7cbde"
dependencies = [
"libc",
]
[[package]]
name = "regex"
version = "1.12.3"
@ -2439,13 +2448,14 @@ dependencies = [
[[package]]
name = "ytd"
version = "0.1.0"
version = "0.2.0"
dependencies = [
"anyhow",
"chrono",
"clap",
"env_logger",
"log",
"redb",
"rustypipe",
"serde",
"tokio",

View file

@ -1,6 +1,6 @@
[package]
name = "ytd"
version = "0.1.0"
version = "0.2.0"
edition = "2024"
license = "MIT"
readme = "README.md"
@ -15,6 +15,7 @@ chrono = "0.4.44"
clap = { version = "4.6", features = ["derive"] }
env_logger = "0.11.10"
log = "0.4.29"
redb = "4.0.0"
rustypipe = "0.11.4"
serde = { version = "1.0.228", features = ["derive"] }
tokio = { version = "1.51.0", features = ["rt-multi-thread"] }

View file

@ -1,3 +1,6 @@
# Persist processed entries between sessions
database = "database.redb"
# Update channels queue, in seconds (activates daemon mode)
# * unset or comment to run once
# update = 60
@ -6,7 +9,7 @@
[channel.test]
id = "UCl2mFZoRqjw_ELax4Yisf6w" # channel ID
is_live = true
is_live = false
is_short = false
is_upcoming = false

View file

@ -2,11 +2,13 @@ mod channel;
use channel::Channel;
use serde::Deserialize;
use std::collections::HashMap;
use std::{collections::HashMap, path::PathBuf};
#[derive(Deserialize)]
pub struct Config {
pub channel: HashMap<String, Channel>,
/// Persist processed entries between sessions
pub database: PathBuf,
/// Repeat delay in seconds (activates daemon mode)
/// * None to run once
pub update: Option<u64>,

35
src/database.rs Normal file
View file

@ -0,0 +1,35 @@
use std::path::PathBuf;
use chrono::Utc;
use redb::{Database as Db, Error, ReadableDatabase, TableDefinition};
const PROCESSED: TableDefinition<&str, i64> = TableDefinition::new("processed");
pub struct Database(Db);
impl Database {
pub fn new(path: &PathBuf) -> Result<Self, Error> {
let db = Db::create(path)?;
Ok(Self(db))
}
pub fn get(&self, item_id: &str) -> Result<Option<i64>, Error> {
let read = self.0.begin_read()?;
let table = read.open_table(PROCESSED)?;
Ok(table.get(item_id)?.map(|v| v.value()))
}
pub fn is_processed(&self, item_id: &str) -> Result<bool, Error> {
Ok(self.get(item_id)?.is_some())
}
pub fn process(&mut self, item_id: &str) -> Result<(), Error> {
let write = self.0.begin_write()?;
{
let mut table = write.open_table(PROCESSED)?;
table.insert(item_id, Utc::now().timestamp())?;
}
Ok(write.commit()?)
}
}

View file

@ -1,10 +1,12 @@
mod argument;
mod config;
mod database;
use argument::Argument;
use chrono::Local;
use clap::Parser;
use config::Config;
use database::Database;
use log::*;
use rustypipe::client::RustyPipe;
use std::{env::var, process::Command, time::Duration};
@ -29,18 +31,44 @@ async fn main() {
let config: Config =
toml::from_str(&std::fs::read_to_string(&argument.config).unwrap()).unwrap();
let update = config.update.map(Duration::from_secs);
let mut database = Database::new(&config.database).unwrap();
let rp = RustyPipe::new();
loop {
for (c, channel) in &config.channel {
debug!("handle channel `{c}` ({})...", channel.id);
match rp.query().channel_videos(&channel.id).await {
Ok(result) => {
for item in result.content.items.iter().filter(|i| {
i.is_live == channel.is_live
&& i.is_short == channel.is_short
&& i.is_upcoming == channel.is_upcoming
}) {
for item in result.content.items {
if database.is_processed(&item.id).is_ok_and(|r| r) {
debug!(
"item `{}` for channel `{c}` already processed; skip handle.",
item.id
);
continue;
}
if item.is_live != channel.is_live {
debug!(
"skip item `{}` for channel `{c}` by `is_live` filter (item: {:?}/config:{:?})",
item.id, item.is_live, channel.is_live
);
continue;
}
if item.is_short != channel.is_short {
debug!(
"skip item `{}` for channel `{c}` by `is_short` filter (item: {:?}/config:{:?})",
item.id, item.is_short, channel.is_short
);
continue;
}
if item.is_upcoming != channel.is_upcoming {
debug!(
"skip item `{}` for channel `{c}` by `is_upcoming` filter (item: {:?}/config:{:?})",
item.id, item.is_upcoming, channel.is_upcoming
);
continue;
}
for command in &channel.command {
let cmd = command.exec.replace("{ID}", &item.id);
match Command::new("sh").arg("-c").arg(&cmd).output() {
@ -49,10 +77,16 @@ async fn main() {
if command.stdout_contains.as_ref().is_none_or(|s| {
String::from_utf8_lossy(&response.stdout).contains(s)
}) {
debug!(
"command `{cmd}` for channel `{c}` successful: `{:?}`",
response
)
match database.process(&item.id) {
Ok(()) => debug!(
"command `{cmd}` for channel `{c}` successful: `{:?}`",
response
),
Err(e) => error!(
"can't persist processed item `{}` for channel `{c}` by `{cmd}`: `{e}`",
item.id
),
}
} else {
warn!(
"unexpected command `{cmd}` for channel `{c}`: `{:?}`",