diff --git a/Cargo.toml b/Cargo.toml index d5a938d..e510071 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,5 +3,6 @@ resolver = "2" members = [ "crates/crawler", "crates/http", + "crates/llm", "crates/mysql", ] \ No newline at end of file diff --git a/README.md b/README.md index e9ba5b5..ef2f5d3 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ Crawl content from RSS feeds into multiple formats ## Components -* [x] `rssto-crawler` - RSS feed reader and data scrapper daemon -* [x] `rssto-http` - Web server implementation based on the Rocket engine -* [x] `rssto-mysql` - Shared database library -* [ ] `rssto-llama` - Feeds auto-translation tool \ No newline at end of file +* `rssto-crawler` - RSS feed reader and data scrapper daemon +* `rssto-http` - Web server implementation based on the Rocket engine +* `rssto-llm` - Feeds auto-translation +* `rssto-mysql` - Shared database library \ No newline at end of file diff --git a/crates/crawler/src/main.rs b/crates/crawler/src/main.rs index eb57623..9156f0f 100644 --- a/crates/crawler/src/main.rs +++ b/crates/crawler/src/main.rs @@ -190,7 +190,7 @@ fn crawl(db: &Mysql, channel_config: &config::Channel) -> Result<()> { db.contents_by_channel_item_id_provider_id(channel_item_id, None, Some(1))? .is_empty() ); - let _content_id = db.insert_content(channel_item_id, None, title, description)?; + let _content_id = db.insert_content(channel_item_id, None, &title, &description)?; // @TODO preload media } Ok(()) diff --git a/crates/http/README.md b/crates/http/README.md index 3034cb8..c9edc0e 100644 --- a/crates/http/README.md +++ b/crates/http/README.md @@ -7,7 +7,8 @@ Web server implementation based on the Rocket engine ``` cd rssto/crates/rssto-http -cargo run -- --mysql-username USER \ - --mysql-password PASS \ - --mysql-database NAME -``` \ No newline at end of file +cargo run -- --mysql-username {USER} \ + --mysql-password {PASS} \ + --mysql-database {NAME} +``` +* optionally, use `--provider-id {ID}` to filter content using post-processing results (e.g. generated by the `rssto-llm` crate) \ No newline at end of file diff --git a/crates/llm/Cargo.toml b/crates/llm/Cargo.toml new file mode 100644 index 0000000..7bc1b53 --- /dev/null +++ b/crates/llm/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "rssto-llm" +version = "0.1.0" +edition = "2024" +license = "MIT" +readme = "README.md" +description = "LLM daemon for the rssto DB translations" +keywords = ["rss", "llm", "translation", "localization", "server"] +categories = ["command-line-utilities", "parsing", "text-processing", "value-formatting"] +repository = "https://github.com/YGGverse/rssto" + +[dependencies] +anyhow = "1.0.100" +chrono = "0.4.42" +clap = { version = "4.5.54", features = ["derive"] } +lancor = "0.1.1" +log = "0.4.29" +mysql = { package = "rssto-mysql", version = "0.1.0", path = "../mysql" } +tokio = { version = "1.0", features = ["full"] } +tracing-subscriber = { version = "0.3.22", features = ["env-filter"] } \ No newline at end of file diff --git a/crates/llm/LICENSE b/crates/llm/LICENSE new file mode 100644 index 0000000..a9c0006 --- /dev/null +++ b/crates/llm/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2026 YGGverse + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/crates/llm/README.md b/crates/llm/README.md new file mode 100644 index 0000000..901fb66 --- /dev/null +++ b/crates/llm/README.md @@ -0,0 +1,28 @@ +# rssto-llm + +LLM daemon for the rssto DB translations + +> [!NOTE] +> In development! + +1. Setup `rssto-crawler` first and collect initial data + +2. Run LLM server: + +``` +llama-server -hf ggml-org/gemma-3-1b-it-GGUF` +``` + +3. Launch `rssto-llm` to handle `content` DB: + +``` +cd rssto/crates/rssto-llm +cargo run -- --mysql-username {USER} \ + --mysql-password {PASS} \ + --mysql-database {NAME} \ + --llm-host {HOST} \ + --llm-port {PORT} \ + --llm-model {MODEL} \ + --llm-message {MESSAGE} +``` +* see `--help` to display all supported options \ No newline at end of file diff --git a/crates/llm/src/argument.rs b/crates/llm/src/argument.rs new file mode 100644 index 0000000..a3cc51d --- /dev/null +++ b/crates/llm/src/argument.rs @@ -0,0 +1,37 @@ +use clap::Parser; + +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +pub struct Argument { + // LLM + #[arg(long, default_value_t = String::from("http"))] + pub llm_scheme: String, + #[arg(long, default_value_t = String::from("localhost"))] + pub llm_host: String, + #[arg(long, default_value_t = 8080)] + pub llm_port: u16, + + /// Model name (e.g. `INSAIT-Institute/MamayLM-Gemma-2-9B-IT-v0.1-GGUF` or `ggml-org/gemma-3-1b-it-GGUF`) + #[arg(long)] + pub llm_model: String, + + /// Initial message for the `content` subject (e.g. `translate to...`) + #[arg(long)] + pub llm_message: String, + + // Database + #[arg(long, default_value_t = String::from("localhost"))] + pub mysql_host: String, + #[arg(long, default_value_t = 3306)] + pub mysql_port: u16, + #[arg(long)] + pub mysql_username: String, + #[arg(long)] + pub mysql_password: String, + #[arg(long)] + pub mysql_database: String, + /// Loop update in seconds + /// * None to exit on complete + #[arg(long, short)] + pub update: Option, +} diff --git a/crates/llm/src/main.rs b/crates/llm/src/main.rs new file mode 100644 index 0000000..8510253 --- /dev/null +++ b/crates/llm/src/main.rs @@ -0,0 +1,103 @@ +mod argument; + +use anyhow::Result; + +#[tokio::main] +async fn main() -> Result<()> { + use argument::Argument; + use chrono::Local; + use clap::Parser; + use lancor::{ChatCompletionRequest, LlamaCppClient, Message}; + use log::{debug, info}; + use mysql::{Mysql, Sort}; + use std::env::var; + + if var("RUST_LOG").is_ok() { + use tracing_subscriber::{EnvFilter, fmt::*}; + struct T; + impl time::FormatTime for T { + fn format_time(&self, w: &mut format::Writer<'_>) -> std::fmt::Result { + write!(w, "{}", Local::now()) + } + } + fmt() + .with_timer(T) + .with_env_filter(EnvFilter::from_default_env()) + .init() + } + + let arg = Argument::parse(); + let db = Mysql::connect( + &arg.mysql_host, + arg.mysql_port, + &arg.mysql_username, + &arg.mysql_password, + &arg.mysql_database, + )?; + let llm = LlamaCppClient::new(format!( + "{}://{}:{}", + arg.llm_scheme, arg.llm_host, arg.llm_port + ))?; + + let provider_id = match db.provider_by_name(&arg.llm_model)? { + Some(p) => { + debug!( + "Use existing DB provider #{} matches model name `{}`", + p.provider_id, &arg.llm_model + ); + p.provider_id + } + None => { + let provider_id = db.insert_provider(&arg.llm_model)?; + info!( + "Provider `{}` not found in database, created new one with ID `{provider_id}`", + &arg.llm_model + ); + provider_id + } + }; + + info!("Daemon started"); + loop { + debug!("New queue begin..."); + for source in db.contents_queue_for_provider_id(provider_id, Sort::Asc, None)? { + debug!( + "Begin generating `content_id` #{} using `provider_id` #{provider_id}.", + source.content_id + ); + + let title = + llm.chat_completion(ChatCompletionRequest::new(&arg.llm_model).message( + Message::user(format!("{}\n{}", arg.llm_message, source.title)), + )) + .await?; + + println!("{}", &title.choices[0].message.content); + + let description = + llm.chat_completion(ChatCompletionRequest::new(&arg.llm_model).message( + Message::user(format!("{}\n{}", arg.llm_message, source.description)), + )) + .await?; + + let content_id = db.insert_content( + source.channel_item_id, + Some(provider_id), + &title.choices[0].message.content, + &description.choices[0].message.content, + )?; + + debug!( + "Created `content_id` #{content_id} using `content_id` #{} source by `provider_id` #{provider_id}.", + source.content_id + ) + } + debug!("Queue completed"); + if let Some(update) = arg.update { + debug!("Wait {update} seconds to continue..."); + std::thread::sleep(std::time::Duration::from_secs(update)) + } else { + return Ok(()); + } + } +} diff --git a/crates/mysql/src/lib.rs b/crates/mysql/src/lib.rs index e601545..f8c6c48 100644 --- a/crates/mysql/src/lib.rs +++ b/crates/mysql/src/lib.rs @@ -108,7 +108,7 @@ impl Mysql { pub fn contents_total_by_provider_id(&self, provider_id: Option) -> Result { let total: Option = self.pool.get_conn()?.exec_first( - "SELECT COUNT(*) FROM `content` WHERE `provider_id` = ?", + "SELECT COUNT(*) FROM `content` WHERE `provider_id` <=> ?", (provider_id,), )?; Ok(total.unwrap_or(0)) @@ -125,12 +125,35 @@ impl Mysql { `channel_item_id`, `provider_id`, `title`, - `description` FROM `content` WHERE `provider_id` = ? ORDER BY `content_id` {sort} LIMIT {}", + `description` FROM `content` WHERE `provider_id` <=> ? ORDER BY `content_id` {sort} LIMIT {}", limit.unwrap_or(DEFAULT_LIMIT) ), (provider_id, )) } + /// Get subjects for `rssto-llm` queue + pub fn contents_queue_for_provider_id( + &self, + provider_id: u64, + sort: Sort, + limit: Option, + ) -> Result, Error> { + self.pool.get_conn()?.exec( + format!( + "SELECT `c1`.`content_id`, + `c1`.`channel_item_id`, + `c1`.`provider_id`, + `c1`.`title`, + `c1`.`description` + FROM `content` AS `c1` WHERE `c1`.`provider_id` IS NULL AND NOT EXISTS ( + SELECT NULL FROM `content` AS `c2` WHERE `c2`.`channel_item_id` = `c1`.`channel_item_id` AND `c2`.`provider_id` = ? LIMIT 1 + ) ORDER BY `c1`.`content_id` {sort} LIMIT {}", + limit.unwrap_or(DEFAULT_LIMIT) + ), + (provider_id,), + ) + } + pub fn contents_by_channel_item_id_provider_id( &self, channel_item_id: u64, @@ -143,7 +166,7 @@ impl Mysql { `channel_item_id`, `provider_id`, `title`, - `description` FROM `content` WHERE `channel_item_id` = ? AND `provider_id` = ? LIMIT {}", + `description` FROM `content` WHERE `channel_item_id` = ? AND `provider_id` <=> ? LIMIT {}", limit.unwrap_or(DEFAULT_LIMIT) ), (channel_item_id, provider_id), @@ -154,8 +177,8 @@ impl Mysql { &self, channel_item_id: u64, provider_id: Option, - title: String, - description: String, + title: &str, + description: &str, ) -> Result { let mut c = self.pool.get_conn()?; c.exec_drop( @@ -164,6 +187,21 @@ impl Mysql { )?; Ok(c.last_insert_id()) } + + pub fn provider_by_name(&self, name: &str) -> Result, Error> { + self.pool.get_conn()?.exec_first( + "SELECT `provider_id`, + `name` + FROM `provider` WHERE `name` = ?", + (name,), + ) + } + + pub fn insert_provider(&self, name: &str) -> Result { + let mut c = self.pool.get_conn()?; + c.exec_drop("INSERT INTO `provider` SET `name` = ?", (name,))?; + Ok(c.last_insert_id()) + } } #[derive(Debug, PartialEq, Eq, FromRow)] @@ -194,6 +232,12 @@ pub struct Content { pub description: String, } +#[derive(Debug, PartialEq, Eq, FromRow)] +pub struct Provider { + pub provider_id: u64, + pub name: String, +} + pub enum Sort { Asc, Desc,