mod argument; mod config; mod database; use argument::Argument; use chrono::Local; use clap::Parser; use config::Config; use database::Database; use log::*; use rustypipe::client::RustyPipe; use std::{env::var, process::Command, thread, time::Duration}; #[tokio::main] async fn main() { if var("RUST_LOG").is_ok() { use tracing_subscriber::{EnvFilter, fmt::*}; struct T; impl time::FormatTime for T { fn format_time(&self, w: &mut format::Writer<'_>) -> std::fmt::Result { write!(w, "{}", Local::now()) } } fmt() .with_timer(T) .with_env_filter(EnvFilter::from_default_env()) .init() } let argument = Argument::parse(); let config: Config = toml::from_str(&std::fs::read_to_string(&argument.config).unwrap()).unwrap(); let update = config.update.map(Duration::from_secs); let sleep = config.sleep.map(Duration::from_secs); let mut database = Database::new(&config.database).unwrap(); let rp = RustyPipe::new(); loop { info!("begin {} channels update...", config.channel.len()); for (c, channel) in &config.channel { debug!("get `{c}` ({})...", channel.id); match rp.query().channel_videos(&channel.id).await { Ok(result) => { let items = result.content.items; debug!( "received {:?} items to handle, limit: {:?}...", items.len(), channel.items_limit ); for (i, item) in items.into_iter().enumerate() { if channel.items_limit.is_some_and(|l| i >= l) { debug!("items limit for channel `{c}` reached at {i}; break."); break; } if channel .item_name_regex .as_ref() .is_some_and(|regex| regex.is_match(&item.name)) { debug!( "item name `{}` for channel `{c}` does not match pattern `{}`; skip.", &item.name, &channel.item_name_regex.as_ref().unwrap() ); continue; } match database.get(&item.id) { Ok(id) => { if id.is_some() { debug!( "item `{}` for channel `{c}` already processed; skip handle.", item.id ); continue; } } Err(e) => panic!( "can't check item `{}` in DB for channel `{c}`: {e}", item.id ), } if item.is_live != channel.is_live { debug!( "skip item `{}` for channel `{c}` by `is_live` filter (item: {:?}/config:{:?})", item.id, item.is_live, channel.is_live ); continue; } if item.is_short != channel.is_short { debug!( "skip item `{}` for channel `{c}` by `is_short` filter (item: {:?}/config:{:?})", item.id, item.is_short, channel.is_short ); continue; } if item.is_upcoming != channel.is_upcoming { debug!( "skip item `{}` for channel `{c}` by `is_upcoming` filter (item: {:?}/config:{:?})", item.id, item.is_upcoming, channel.is_upcoming ); continue; } for command in &channel.command { let cmd = command.exec.replace("{ID}", &item.id); match Command::new("sh").arg("-c").arg(&cmd).output() { Ok(response) => { if response.status.success() { if command.stdout_contains.as_ref().is_none_or(|s| { String::from_utf8_lossy(&response.stdout).contains(s) }) { match database.process(&item.id) { Ok(()) => info!( "exec `{cmd}` for channel `{c}` successful: `{:?}`", response ), Err(e) => panic!( "can't persist processed item `{}` for channel `{c}` by `{cmd}`: `{e}`", item.id ), } } else { warn!( "unexpected exec `{cmd}` for channel `{c}`: `{:?}`", response ) } } else { warn!( "exec `{cmd}` for channel `{c}` failed: `{:?}`", response ) } } Err(e) => { warn!("can't execute command `{cmd}` for channel `{c}`: `{e}`") } } } if let Some(duration) = sleep { debug!("await {} seconds to continue...", duration.as_secs()); thread::sleep(duration) } } } Err(e) => warn!("can't scrape channel `{c}`: `{e}`"), } } match update { Some(duration) => { info!( "queue completed; await {} seconds to continue...", duration.as_secs() ); thread::sleep(duration) } None => { info!("all tasks completed; exit."); break; } } } }