diff --git a/.gitignore b/.gitignore index 22f827d..b8b6c05 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ *.json +*.redb /target \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 103ff6c..3523426 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1247,6 +1247,15 @@ dependencies = [ "getrandom 0.3.4", ] +[[package]] +name = "redb" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67f7f231ea7b1172b7ac00ccf96b1250f0fb5a16d5585836aa4ebc997df7cbde" +dependencies = [ + "libc", +] + [[package]] name = "regex" version = "1.12.3" @@ -2439,13 +2448,14 @@ dependencies = [ [[package]] name = "ytd" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anyhow", "chrono", "clap", "env_logger", "log", + "redb", "rustypipe", "serde", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 1b7c069..979153b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ytd" -version = "0.1.0" +version = "0.2.0" edition = "2024" license = "MIT" readme = "README.md" @@ -15,6 +15,7 @@ chrono = "0.4.44" clap = { version = "4.6", features = ["derive"] } env_logger = "0.11.10" log = "0.4.29" +redb = "4.0.0" rustypipe = "0.11.4" serde = { version = "1.0.228", features = ["derive"] } tokio = { version = "1.51.0", features = ["rt-multi-thread"] } diff --git a/example/config.toml b/example/config.toml index 68c3ebd..2f515d7 100644 --- a/example/config.toml +++ b/example/config.toml @@ -1,3 +1,6 @@ +# Persist processed entries between sessions +database = "database.redb" + # Update channels queue, in seconds (activates daemon mode) # * unset or comment to run once # update = 60 @@ -6,7 +9,7 @@ [channel.test] id = "UCl2mFZoRqjw_ELax4Yisf6w" # channel ID -is_live = true +is_live = false is_short = false is_upcoming = false diff --git a/src/config.rs b/src/config.rs index 80927f0..5bdc140 100644 --- a/src/config.rs +++ b/src/config.rs @@ -2,11 +2,13 @@ mod channel; use channel::Channel; use serde::Deserialize; -use std::collections::HashMap; +use std::{collections::HashMap, path::PathBuf}; #[derive(Deserialize)] pub struct Config { pub channel: HashMap, + /// Persist processed entries between sessions + pub database: PathBuf, /// Repeat delay in seconds (activates daemon mode) /// * None to run once pub update: Option, diff --git a/src/database.rs b/src/database.rs new file mode 100644 index 0000000..330eabf --- /dev/null +++ b/src/database.rs @@ -0,0 +1,35 @@ +use std::path::PathBuf; + +use chrono::Utc; +use redb::{Database as Db, Error, ReadableDatabase, TableDefinition}; + +const PROCESSED: TableDefinition<&str, i64> = TableDefinition::new("processed"); + +pub struct Database(Db); + +impl Database { + pub fn new(path: &PathBuf) -> Result { + let db = Db::create(path)?; + + Ok(Self(db)) + } + + pub fn get(&self, item_id: &str) -> Result, Error> { + let read = self.0.begin_read()?; + let table = read.open_table(PROCESSED)?; + Ok(table.get(item_id)?.map(|v| v.value())) + } + + pub fn is_processed(&self, item_id: &str) -> Result { + Ok(self.get(item_id)?.is_some()) + } + + pub fn process(&mut self, item_id: &str) -> Result<(), Error> { + let write = self.0.begin_write()?; + { + let mut table = write.open_table(PROCESSED)?; + table.insert(item_id, Utc::now().timestamp())?; + } + Ok(write.commit()?) + } +} diff --git a/src/main.rs b/src/main.rs index 007ce25..b991bd1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,12 @@ mod argument; mod config; +mod database; use argument::Argument; use chrono::Local; use clap::Parser; use config::Config; +use database::Database; use log::*; use rustypipe::client::RustyPipe; use std::{env::var, process::Command, time::Duration}; @@ -29,18 +31,44 @@ async fn main() { let config: Config = toml::from_str(&std::fs::read_to_string(&argument.config).unwrap()).unwrap(); let update = config.update.map(Duration::from_secs); + let mut database = Database::new(&config.database).unwrap(); let rp = RustyPipe::new(); loop { for (c, channel) in &config.channel { + debug!("handle channel `{c}` ({})...", channel.id); match rp.query().channel_videos(&channel.id).await { Ok(result) => { - for item in result.content.items.iter().filter(|i| { - i.is_live == channel.is_live - && i.is_short == channel.is_short - && i.is_upcoming == channel.is_upcoming - }) { + for item in result.content.items { + if database.is_processed(&item.id).is_ok_and(|r| r) { + debug!( + "item `{}` for channel `{c}` already processed; skip handle.", + item.id + ); + continue; + } + if item.is_live != channel.is_live { + debug!( + "skip item `{}` for channel `{c}` by `is_live` filter (item: {:?}/config:{:?})", + item.id, item.is_live, channel.is_live + ); + continue; + } + if item.is_short != channel.is_short { + debug!( + "skip item `{}` for channel `{c}` by `is_short` filter (item: {:?}/config:{:?})", + item.id, item.is_short, channel.is_short + ); + continue; + } + if item.is_upcoming != channel.is_upcoming { + debug!( + "skip item `{}` for channel `{c}` by `is_upcoming` filter (item: {:?}/config:{:?})", + item.id, item.is_upcoming, channel.is_upcoming + ); + continue; + } for command in &channel.command { let cmd = command.exec.replace("{ID}", &item.id); match Command::new("sh").arg("-c").arg(&cmd).output() { @@ -49,10 +77,16 @@ async fn main() { if command.stdout_contains.as_ref().is_none_or(|s| { String::from_utf8_lossy(&response.stdout).contains(s) }) { - debug!( - "command `{cmd}` for channel `{c}` successful: `{:?}`", - response - ) + match database.process(&item.id) { + Ok(()) => debug!( + "command `{cmd}` for channel `{c}` successful: `{:?}`", + response + ), + Err(e) => error!( + "can't persist processed item `{}` for channel `{c}` by `{cmd}`: `{e}`", + item.id + ), + } } else { warn!( "unexpected command `{cmd}` for channel `{c}`: `{:?}`",