From f48e256fadd82cdf7a643bf4bf1f30feba9e33a0 Mon Sep 17 00:00:00 2001 From: yggverse Date: Fri, 9 Jan 2026 22:35:06 +0200 Subject: [PATCH] separate Pollable and Transactional features, separate table members, use single-connection transactions method in the crawler and llm crates, minor crawler optimizations such as disconnect from db server on each queue iteration complete --- crates/crawler/Cargo.toml | 2 +- crates/crawler/src/main.rs | 44 ++-- crates/http/src/main.rs | 10 +- crates/llm/Cargo.toml | 2 +- crates/llm/src/main.rs | 116 +++++----- crates/mysql/Cargo.toml | 7 +- crates/mysql/src/lib.rs | 338 +----------------------------- crates/mysql/src/pollable.rs | 114 ++++++++++ crates/mysql/src/pollable/sort.rs | 13 ++ crates/mysql/src/table.rs | 53 +++++ crates/mysql/src/transactional.rs | 148 +++++++++++++ 11 files changed, 438 insertions(+), 409 deletions(-) create mode 100644 crates/mysql/src/pollable.rs create mode 100644 crates/mysql/src/pollable/sort.rs create mode 100644 crates/mysql/src/table.rs create mode 100644 crates/mysql/src/transactional.rs diff --git a/crates/crawler/Cargo.toml b/crates/crawler/Cargo.toml index 11d6062..1de8c3d 100644 --- a/crates/crawler/Cargo.toml +++ b/crates/crawler/Cargo.toml @@ -14,7 +14,7 @@ anyhow = "1.0.100" chrono = "0.4.42" clap = { version = "4.5.54", features = ["derive"] } log = "0.4.29" -mysql = { package = "rssto-mysql", version = "0.1.0", path = "../mysql" } +mysql = { package = "rssto-mysql", version = "0.1.0", features = ["transactional"], path = "../mysql" } reqwest = { version = "0.13.1", features = ["blocking"] } rss = "2.0.12" scraper = { version = "0.25.0", features = ["serde"] } diff --git a/crates/crawler/src/main.rs b/crates/crawler/src/main.rs index 9156f0f..25c8279 100644 --- a/crates/crawler/src/main.rs +++ b/crates/crawler/src/main.rs @@ -3,7 +3,7 @@ mod config; use anyhow::Result; use log::{debug, info, warn}; -use mysql::Mysql; +use mysql::Transactional; use reqwest::blocking::get; fn main() -> Result<()> { @@ -28,22 +28,27 @@ fn main() -> Result<()> { let argument = Argument::parse(); let config: config::Config = toml::from_str(&read_to_string(argument.config)?)?; - let database = Mysql::connect( - &config.mysql.host, - config.mysql.port, - &config.mysql.user, - &config.mysql.password, - &config.mysql.database, - )?; info!("Crawler started"); loop { debug!("Begin new crawl queue..."); - for c in &config.channel { - debug!("Update `{}`...", c.url); - if let Err(e) = crawl(&database, c) { - warn!("Channel `{}` update failed: `{e}`", c.url) + { + // disconnect from the database immediately when exiting this scope, + // in case the `update` queue is enabled and pending for a while. + let mut db = Transactional::connect( + &config.mysql.host, + config.mysql.port, + &config.mysql.user, + &config.mysql.password, + &config.mysql.database, + )?; + for c in &config.channel { + debug!("Update `{}`...", c.url); + if let Err(e) = crawl(&mut db, c) { + warn!("Channel `{}` update failed: `{e}`", c.url) + } } + db.commit()? } debug!("Crawl queue completed"); if let Some(update) = config.update { @@ -55,7 +60,7 @@ fn main() -> Result<()> { } } -fn crawl(db: &Mysql, channel_config: &config::Channel) -> Result<()> { +fn crawl(db: &mut Transactional, channel_config: &config::Channel) -> Result<()> { use rss::Channel; use scraper::Selector; @@ -82,8 +87,8 @@ fn crawl(db: &Mysql, channel_config: &config::Channel) -> Result<()> { let channel_items_limit = channel_config.items_limit.unwrap_or(channel_items.len()); - let channel_id = match db.channels_by_url(&channel_url, Some(1))?.first() { - Some(result) => result.channel_id, + let channel_id = match db.channel_id_by_url(&channel_url)? { + Some(channel_id) => channel_id, None => db.insert_channel(&channel_url)?, }; @@ -115,10 +120,7 @@ fn crawl(db: &Mysql, channel_config: &config::Channel) -> Result<()> { continue; } }; - if !db - .channel_items_by_channel_id_guid(channel_id, guid, Some(1))? - .is_empty() - { + if db.channel_items_total_by_channel_id_guid(channel_id, guid)? > 0 { continue; // skip next steps as processed } let channel_item_id = db.insert_channel_item( @@ -186,10 +188,6 @@ fn crawl(db: &Mysql, channel_config: &config::Channel) -> Result<()> { } }, }; - assert!( - db.contents_by_channel_item_id_provider_id(channel_item_id, None, Some(1))? - .is_empty() - ); let _content_id = db.insert_content(channel_item_id, None, &title, &description)?; // @TODO preload media } diff --git a/crates/http/src/main.rs b/crates/http/src/main.rs index 0c04491..3964aea 100644 --- a/crates/http/src/main.rs +++ b/crates/http/src/main.rs @@ -11,7 +11,7 @@ use config::Config; use feed::Feed; use global::Global; use meta::Meta; -use mysql::{Mysql, Sort}; +use mysql::{Pollable, pollable::Sort}; use rocket::{State, http::Status, response::content::RawXml, serde::Serialize}; use rocket_dyn_templates::{Template, context}; @@ -19,7 +19,7 @@ use rocket_dyn_templates::{Template, context}; fn index( search: Option<&str>, page: Option, - db: &State, + db: &State, meta: &State, global: &State, ) -> Result { @@ -92,7 +92,7 @@ fn index( #[get("/")] fn info( content_id: u64, - db: &State, + db: &State, meta: &State, global: &State, ) -> Result { @@ -123,7 +123,7 @@ fn rss( search: Option<&str>, global: &State, meta: &State, - db: &State, + db: &State, ) -> Result, Status> { let mut f = Feed::new( &meta.title, @@ -165,7 +165,7 @@ fn rocket() -> _ { } }) .manage( - Mysql::connect( + Pollable::connect( &config.mysql_host, config.mysql_port, &config.mysql_username, diff --git a/crates/llm/Cargo.toml b/crates/llm/Cargo.toml index 7bc1b53..a5fa968 100644 --- a/crates/llm/Cargo.toml +++ b/crates/llm/Cargo.toml @@ -15,6 +15,6 @@ chrono = "0.4.42" clap = { version = "4.5.54", features = ["derive"] } lancor = "0.1.1" log = "0.4.29" -mysql = { package = "rssto-mysql", version = "0.1.0", path = "../mysql" } +mysql = { package = "rssto-mysql", version = "0.1.0", features = ["transactional"], path = "../mysql" } tokio = { version = "1.0", features = ["full"] } tracing-subscriber = { version = "0.3.22", features = ["env-filter"] } \ No newline at end of file diff --git a/crates/llm/src/main.rs b/crates/llm/src/main.rs index 4caf4e4..dbcfd59 100644 --- a/crates/llm/src/main.rs +++ b/crates/llm/src/main.rs @@ -1,15 +1,16 @@ mod argument; use anyhow::Result; +use argument::Argument; +use mysql::Transactional; #[tokio::main] async fn main() -> Result<()> { - use argument::Argument; use chrono::Local; use clap::Parser; use lancor::{ChatCompletionRequest, LlamaCppClient, Message}; use log::{debug, info}; - use mysql::{Mysql, Sort}; + use std::env::var; if var("RUST_LOG").is_ok() { @@ -27,68 +28,73 @@ async fn main() -> Result<()> { } let arg = Argument::parse(); - let db = Mysql::connect( - &arg.mysql_host, - arg.mysql_port, - &arg.mysql_username, - &arg.mysql_password, - &arg.mysql_database, - )?; let llm = LlamaCppClient::new(format!( "{}://{}:{}", arg.llm_scheme, arg.llm_host, arg.llm_port ))?; - let provider_id = match db.provider_by_name(&arg.llm_model)? { - Some(p) => { - debug!( - "Use existing DB provider #{} matches model name `{}`", - p.provider_id, &arg.llm_model - ); - p.provider_id - } - None => { - let provider_id = db.insert_provider(&arg.llm_model)?; - info!( - "Provider `{}` not found in database, created new one with ID `{provider_id}`", - &arg.llm_model - ); - provider_id + // find existing ID by model name or create a new one + // * this feature should be moved to a separate CLI tool + let provider_id = { + let mut db = tx(&arg)?; + match db.provider_id_by_name(&arg.llm_model)? { + Some(provider_id) => { + debug!( + "Use existing DB provider #{} matches model name `{}`", + provider_id, &arg.llm_model + ); + provider_id + } + None => { + let provider_id = db.insert_provider(&arg.llm_model)?; + info!( + "Provider `{}` not found in database, created new one with ID `{provider_id}`", + &arg.llm_model + ); + db.commit()?; + provider_id + } } }; info!("Daemon started"); loop { debug!("New queue begin..."); - for source in db.contents_queue_for_provider_id(provider_id, Sort::Asc, None)? { - debug!( - "Begin generating `content_id` #{} using `provider_id` #{provider_id}.", - source.content_id - ); + { + // disconnect from the database immediately when exiting this scope, + // in case the `update` queue is enabled and pending for a while. + let mut db = tx(&arg)?; + for source in db.contents_queue_for_provider_id(provider_id)? { + debug!( + "Begin generating `content_id` #{} using `provider_id` #{provider_id}.", + source.content_id + ); - let title = - llm.chat_completion(ChatCompletionRequest::new(&arg.llm_model).message( - Message::user(format!("{}\n{}", arg.llm_message, source.title)), - )) - .await?; + let title = llm + .chat_completion(ChatCompletionRequest::new(&arg.llm_model).message( + Message::user(format!("{}\n{}", arg.llm_message, source.title)), + )) + .await?; - let description = - llm.chat_completion(ChatCompletionRequest::new(&arg.llm_model).message( - Message::user(format!("{}\n{}", arg.llm_message, source.description)), - )) - .await?; + let description = llm + .chat_completion(ChatCompletionRequest::new(&arg.llm_model).message( + Message::user(format!("{}\n{}", arg.llm_message, source.description)), + )) + .await?; - let content_id = db.insert_content( - source.channel_item_id, - Some(provider_id), - &title.choices[0].message.content, - &description.choices[0].message.content, - )?; + let content_id = db.insert_content( + source.channel_item_id, + Some(provider_id), + &title.choices[0].message.content, + &description.choices[0].message.content, + )?; - debug!( - "Created `content_id` #{content_id} using `content_id` #{} source by `provider_id` #{provider_id}.", - source.content_id - ) + debug!( + "Created `content_id` #{content_id} using `content_id` #{} source by `provider_id` #{provider_id}.", + source.content_id + ) + } + db.commit()? } debug!("Queue completed"); if let Some(update) = arg.update { @@ -99,3 +105,15 @@ async fn main() -> Result<()> { } } } + +// in fact, there is no need for a transaction at this moment, +// as there are no related table updates, but who knows what the future holds +fn tx(arg: &Argument) -> Result { + Ok(Transactional::connect( + &arg.mysql_host, + arg.mysql_port, + &arg.mysql_username, + &arg.mysql_password, + &arg.mysql_database, + )?) +} diff --git a/crates/mysql/Cargo.toml b/crates/mysql/Cargo.toml index ddce0cc..7aeb4af 100644 --- a/crates/mysql/Cargo.toml +++ b/crates/mysql/Cargo.toml @@ -9,5 +9,10 @@ keywords = ["rssto", "database", "mysql", "library", "driver", "api"] # categories = [] repository = "https://github.com/YGGverse/rssto" +[features] +default = ["pollable"] +pollable = [] +transactional = [] + [dependencies] -mysql = "26.0.1" +mysql = "26.0.1" \ No newline at end of file diff --git a/crates/mysql/src/lib.rs b/crates/mysql/src/lib.rs index a0cbdb4..c316798 100644 --- a/crates/mysql/src/lib.rs +++ b/crates/mysql/src/lib.rs @@ -1,333 +1,13 @@ -use mysql::{ - Error, Pool, - prelude::{FromRow, Queryable}, -}; +#[cfg(feature = "pollable")] +pub mod pollable; -pub struct Mysql { - pool: Pool, -} +pub mod table; -impl Mysql { - pub fn connect( - host: &str, - port: u16, - user: &str, - password: &str, - database: &str, - ) -> Result { - Ok(Self { - pool: mysql::Pool::new( - format!("mysql://{user}:{password}@{host}:{port}/{database}").as_str(), - )?, - }) - } +#[cfg(feature = "transactional")] +pub mod transactional; - pub fn channels_by_url(&self, url: &str, limit: Option) -> Result, Error> { - self.pool.get_conn()?.exec( - format!( - "SELECT `channel_id`, `url` FROM `channel` WHERE `url` = ? LIMIT {}", - limit.unwrap_or(DEFAULT_LIMIT) - ), - (url,), - ) - } +#[cfg(feature = "pollable")] +pub use pollable::Pollable; - pub fn insert_channel(&self, url: &str) -> Result { - let mut c = self.pool.get_conn()?; - c.exec_drop("INSERT INTO `channel` SET `url` = ?", (url,))?; - Ok(c.last_insert_id()) - } - - pub fn channel_item(&self, channel_item_id: u64) -> Result, Error> { - self.pool.get_conn()?.exec_first( - "SELECT `channel_item_id`, - `channel_id`, - `pub_date`, - `guid`, - `link`, - `title`, - `description` FROM `channel_item` WHERE `channel_item_id` = ?", - (channel_item_id,), - ) - } - - pub fn channel_items_by_channel_id_guid( - &self, - channel_id: u64, - guid: &str, - limit: Option, - ) -> Result, Error> { - self.pool.get_conn()?.exec( - format!( - "SELECT `channel_item_id`, - `channel_id`, - `pub_date`, - `guid`, - `link`, - `title`, - `description` FROM `channel_item` WHERE `channel_id` = ? AND `guid` = ? LIMIT {}", - limit.unwrap_or(DEFAULT_LIMIT) - ), - (channel_id, guid), - ) - } - - pub fn insert_channel_item( - &self, - channel_id: u64, - pub_date: i64, - guid: &str, - link: &str, - title: Option<&str>, - description: Option<&str>, - ) -> Result { - let mut c = self.pool.get_conn()?; - c.exec_drop( - "INSERT INTO `channel_item` SET `channel_id` = ?, - `pub_date` = ?, - `guid` = ?, - `link` = ?, - `title` = ?, - `description` = ?", - (channel_id, pub_date, guid, link, title, description), - )?; - Ok(c.last_insert_id()) - } - - pub fn content(&self, content_id: u64) -> Result, Error> { - self.pool.get_conn()?.exec_first( - "SELECT `content_id`, - `channel_item_id`, - `provider_id`, - `title`, - `description` FROM `content` WHERE `content_id` = ?", - (content_id,), - ) - } - - pub fn contents_total_by_provider_id( - &self, - provider_id: Option, - keyword: Option<&str>, - ) -> Result { - let total: Option = self.pool.get_conn()?.exec_first( - "SELECT COUNT(*) FROM `content` WHERE `provider_id` <=> ? AND `title` LIKE ?", - (provider_id, like(keyword)), - )?; - Ok(total.unwrap_or(0)) - } - - pub fn contents_by_provider_id( - &self, - provider_id: Option, - keyword: Option<&str>, - sort: Sort, - limit: Option, - ) -> Result, Error> { - self.pool.get_conn()?.exec(format!( - "SELECT `content_id`, - `channel_item_id`, - `provider_id`, - `title`, - `description` FROM `content` WHERE `provider_id` <=> ? AND `title` LIKE ? ORDER BY `content_id` {sort} LIMIT {}", - limit.unwrap_or(DEFAULT_LIMIT) - ), - (provider_id, like(keyword), )) - } - - /// Get subjects for `rssto-llm` queue - pub fn contents_queue_for_provider_id( - &self, - provider_id: u64, - sort: Sort, - limit: Option, - ) -> Result, Error> { - self.pool.get_conn()?.exec( - format!( - "SELECT `c1`.`content_id`, - `c1`.`channel_item_id`, - `c1`.`provider_id`, - `c1`.`title`, - `c1`.`description` - FROM `content` AS `c1` WHERE `c1`.`provider_id` IS NULL AND NOT EXISTS ( - SELECT NULL FROM `content` AS `c2` WHERE `c2`.`channel_item_id` = `c1`.`channel_item_id` AND `c2`.`provider_id` = ? LIMIT 1 - ) ORDER BY `c1`.`content_id` {sort} LIMIT {}", - limit.unwrap_or(DEFAULT_LIMIT) - ), - (provider_id,), - ) - } - - pub fn contents_by_channel_item_id_provider_id( - &self, - channel_item_id: u64, - provider_id: Option, - limit: Option, - ) -> Result, Error> { - self.pool.get_conn()?.exec( - format!( - "SELECT `content_id`, - `channel_item_id`, - `provider_id`, - `title`, - `description` FROM `content` - WHERE `channel_item_id` = ? AND `provider_id` <=> ? LIMIT {}", - limit.unwrap_or(DEFAULT_LIMIT) - ), - (channel_item_id, provider_id), - ) - } - - pub fn insert_content( - &self, - channel_item_id: u64, - provider_id: Option, - title: &str, - description: &str, - ) -> Result { - let mut c = self.pool.get_conn()?; - c.exec_drop( - "INSERT INTO `content` SET `channel_item_id` = ?, - `provider_id` = ?, - `title` = ?, - `description` = ?", - (channel_item_id, provider_id, title, description), - )?; - Ok(c.last_insert_id()) - } - - pub fn content_image(&self, content_image_id: u64) -> Result, Error> { - self.pool.get_conn()?.exec_first( - "SELECT `content_image_id`, - `content_id`, - `image_id`, - `data`, - `source` FROM `content_image` - JOIN `image` ON (`image`.`image_id` = `content_image`.`image_id`) - WHERE `content_image_id` = ? LIMIT 1", - (content_image_id,), - ) - } - - pub fn insert_content_image(&self, content_id: u64, image_id: u64) -> Result { - let mut c = self.pool.get_conn()?; - c.exec_drop( - "INSERT INTO `content_image` SET `content_id` = ?, `image_id` = ?", - (content_id, image_id), - )?; - Ok(c.last_insert_id()) - } - - pub fn image_by_source(&self, source: &str) -> Result, Error> { - self.pool.get_conn()?.exec_first( - "SELECT `image_id`, - `source`, - `data` FROM `image` WHERE `source` = ? LIMIT 1", - (source,), - ) - } - - pub fn images(&self, limit: Option) -> Result, Error> { - self.pool.get_conn()?.query(format!( - "SELECT `image_id`, `source`, `data` FROM `image` LIMIT {}", - limit.unwrap_or(DEFAULT_LIMIT) - )) - } - - pub fn insert_image(&self, source: &str, data: &[u8]) -> Result { - let mut c = self.pool.get_conn()?; - c.exec_drop( - "INSERT INTO `image` SET `source` = ?, `data` = ?", - (source, data), - )?; - Ok(c.last_insert_id()) - } - - pub fn provider_by_name(&self, name: &str) -> Result, Error> { - self.pool.get_conn()?.exec_first( - "SELECT `provider_id`, - `name` - FROM `provider` WHERE `name` = ?", - (name,), - ) - } - - pub fn insert_provider(&self, name: &str) -> Result { - let mut c = self.pool.get_conn()?; - c.exec_drop("INSERT INTO `provider` SET `name` = ?", (name,))?; - Ok(c.last_insert_id()) - } -} - -#[derive(Debug, PartialEq, Eq, FromRow)] -pub struct Channel { - pub channel_id: u64, - pub url: String, -} - -#[derive(Debug, PartialEq, Eq, FromRow)] -pub struct ChannelItem { - pub channel_item_id: u64, - pub channel_id: u64, - pub pub_date: i64, - pub guid: String, - pub link: String, - pub title: Option, - pub description: Option, -} - -#[derive(Debug, PartialEq, Eq, FromRow)] -pub struct Content { - pub content_id: u64, - pub channel_item_id: u64, - /// None if the original `title` and `description` values - /// parsed from the channel item on crawl - pub provider_id: Option, - pub title: String, - pub description: String, -} - -#[derive(Debug, PartialEq, Eq, FromRow)] -pub struct Provider { - pub provider_id: u64, - pub name: String, -} - -#[derive(Debug, PartialEq, Eq, FromRow)] -pub struct Image { - pub image_id: u64, - pub source: String, - pub data: Vec, -} - -/// Includes joined `image` table members -#[derive(Debug, PartialEq, Eq, FromRow)] -pub struct ContentImage { - pub content_image_id: u64, - pub content_id: u64, - pub image_id: u64, - // Image members (JOIN) - pub data: Vec, - pub source: String, -} - -pub enum Sort { - Asc, - Desc, -} - -impl std::fmt::Display for Sort { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match self { - Self::Asc => write!(f, "ASC"), - Self::Desc => write!(f, "DESC"), - } - } -} - -/// Shared search logic -fn like(value: Option<&str>) -> String { - value.map_or("%".into(), |k| format!("{k}%")) -} - -const DEFAULT_LIMIT: usize = 100; +#[cfg(feature = "transactional")] +pub use transactional::Transactional; diff --git a/crates/mysql/src/pollable.rs b/crates/mysql/src/pollable.rs new file mode 100644 index 0000000..474c427 --- /dev/null +++ b/crates/mysql/src/pollable.rs @@ -0,0 +1,114 @@ +pub mod sort; + +pub use sort::Sort; + +use crate::table::*; +use mysql::{Error, Pool, prelude::Queryable}; + +/// Safe, read-only operations used in client apps like `rssto-http` +pub struct Pollable { + pool: Pool, +} + +impl Pollable { + pub fn connect( + host: &str, + port: u16, + user: &str, + password: &str, + database: &str, + ) -> Result { + Ok(Self { + pool: mysql::Pool::new( + format!("mysql://{user}:{password}@{host}:{port}/{database}").as_str(), + )?, + }) + } + + pub fn channel_item(&self, channel_item_id: u64) -> Result, Error> { + self.pool.get_conn()?.exec_first( + "SELECT `channel_item_id`, + `channel_id`, + `pub_date`, + `guid`, + `link`, + `title`, + `description` FROM `channel_item` WHERE `channel_item_id` = ?", + (channel_item_id,), + ) + } + + pub fn content(&self, content_id: u64) -> Result, Error> { + self.pool.get_conn()?.exec_first( + "SELECT `content_id`, + `channel_item_id`, + `provider_id`, + `title`, + `description` FROM `content` WHERE `content_id` = ?", + (content_id,), + ) + } + + pub fn contents_total_by_provider_id( + &self, + provider_id: Option, + keyword: Option<&str>, + ) -> Result { + let total: Option = self.pool.get_conn()?.exec_first( + "SELECT COUNT(*) FROM `content` WHERE `provider_id` <=> ? AND `title` LIKE ?", + (provider_id, like(keyword)), + )?; + Ok(total.unwrap_or(0)) + } + + pub fn contents_by_provider_id( + &self, + provider_id: Option, + keyword: Option<&str>, + sort: Sort, + limit: Option, + ) -> Result, Error> { + self.pool.get_conn()?.exec(format!( + "SELECT `content_id`, + `channel_item_id`, + `provider_id`, + `title`, + `description` FROM `content` WHERE `provider_id` <=> ? AND `title` LIKE ? ORDER BY `content_id` {sort} LIMIT {}", + limit.unwrap_or(DEFAULT_LIMIT) + ), + (provider_id, like(keyword), )) + } + + pub fn content_image(&self, content_image_id: u64) -> Result, Error> { + self.pool.get_conn()?.exec_first( + "SELECT `content_image_id`, + `content_id`, + `image_id`, + `data`, + `source` FROM `content_image` + JOIN `image` ON (`image`.`image_id` = `content_image`.`image_id`) + WHERE `content_image_id` = ? LIMIT 1", + (content_image_id,), + ) + } + + pub fn images(&self, limit: Option) -> Result, Error> { + self.pool.get_conn()?.query(format!( + "SELECT `image_id`, `source`, `data` FROM `image` LIMIT {}", + limit.unwrap_or(DEFAULT_LIMIT) + )) + } + + pub fn insert_provider(&self, name: &str) -> Result { + let mut c = self.pool.get_conn()?; + c.exec_drop("INSERT INTO `provider` SET `name` = ?", (name,))?; + Ok(c.last_insert_id()) + } +} + +/// Shared search logic +fn like(value: Option<&str>) -> String { + value.map_or("%".into(), |k| format!("{k}%")) +} + +const DEFAULT_LIMIT: usize = 100; diff --git a/crates/mysql/src/pollable/sort.rs b/crates/mysql/src/pollable/sort.rs new file mode 100644 index 0000000..d8b121d --- /dev/null +++ b/crates/mysql/src/pollable/sort.rs @@ -0,0 +1,13 @@ +pub enum Sort { + Asc, + Desc, +} + +impl std::fmt::Display for Sort { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + Self::Asc => write!(f, "ASC"), + Self::Desc => write!(f, "DESC"), + } + } +} diff --git a/crates/mysql/src/table.rs b/crates/mysql/src/table.rs new file mode 100644 index 0000000..5df3348 --- /dev/null +++ b/crates/mysql/src/table.rs @@ -0,0 +1,53 @@ +use mysql::prelude::FromRow; + +#[derive(Debug, PartialEq, Eq, FromRow)] +pub struct Channel { + pub channel_id: u64, + pub url: String, +} + +#[derive(Debug, PartialEq, Eq, FromRow)] +pub struct ChannelItem { + pub channel_item_id: u64, + pub channel_id: u64, + pub pub_date: i64, + pub guid: String, + pub link: String, + pub title: Option, + pub description: Option, +} + +#[derive(Debug, PartialEq, Eq, FromRow)] +pub struct Content { + pub content_id: u64, + pub channel_item_id: u64, + /// None if the original `title` and `description` values + /// parsed from the channel item on crawl + pub provider_id: Option, + pub title: String, + pub description: String, +} + +#[derive(Debug, PartialEq, Eq, FromRow)] +pub struct Provider { + pub provider_id: u64, + pub name: String, +} + +#[derive(Debug, PartialEq, Eq, FromRow)] +pub struct Image { + pub image_id: u64, + pub source: String, + pub data: Vec, +} + +/// Includes joined `image` table members +#[derive(Debug, PartialEq, Eq, FromRow)] +pub struct ContentImage { + pub content_image_id: u64, + pub content_id: u64, + pub image_id: u64, + // Image members (JOIN) + pub data: Vec, + pub source: String, +} diff --git a/crates/mysql/src/transactional.rs b/crates/mysql/src/transactional.rs new file mode 100644 index 0000000..ce80305 --- /dev/null +++ b/crates/mysql/src/transactional.rs @@ -0,0 +1,148 @@ +use crate::table::*; +use mysql::{Error, Pool, Transaction, TxOpts, prelude::Queryable}; + +/// Safe, optimized read/write operations +/// mostly required by the `rssto-crawler` and `rssto-llm` +/// * all members implementation requires `commit` action +pub struct Transactional { + tx: Transaction<'static>, +} + +impl Transactional { + pub fn connect( + host: &str, + port: u16, + user: &str, + password: &str, + database: &str, + ) -> Result { + Ok(Self { + tx: Pool::new(format!("mysql://{user}:{password}@{host}:{port}/{database}").as_str())? + .start_transaction(TxOpts::default())?, + }) + } + + pub fn commit(self) -> Result<(), Error> { + self.tx.commit() + } + + pub fn channel_id_by_url(&mut self, url: &str) -> Result, Error> { + self.tx.exec_first( + "SELECT `channel_id` FROM `channel` WHERE `url` = ? LIMIT 1", + (url,), + ) + } + + pub fn insert_channel(&mut self, url: &str) -> Result { + self.tx + .exec_drop("INSERT INTO `channel` SET `url` = ?", (url,))?; + Ok(self.tx.last_insert_id().unwrap()) + } + + pub fn channel_items_total_by_channel_id_guid( + &mut self, + channel_id: u64, + guid: &str, + ) -> Result { + Ok(self + .tx + .exec_first( + "SELECT COUNT(*) FROM `channel_item` WHERE `channel_id` = ? AND `guid` = ?", + (channel_id, guid), + )? + .unwrap_or(0)) + } + + pub fn insert_channel_item( + &mut self, + channel_id: u64, + pub_date: i64, + guid: &str, + link: &str, + title: Option<&str>, + description: Option<&str>, + ) -> Result { + self.tx.exec_drop( + "INSERT INTO `channel_item` SET `channel_id` = ?, + `pub_date` = ?, + `guid` = ?, + `link` = ?, + `title` = ?, + `description` = ?", + (channel_id, pub_date, guid, link, title, description), + )?; + Ok(self.tx.last_insert_id().unwrap()) + } + + pub fn contents_queue_for_provider_id( + &mut self, + provider_id: u64, + ) -> Result, Error> { + self.tx.exec( + "SELECT `c1`.`content_id`, + `c1`.`channel_item_id`, + `c1`.`provider_id`, + `c1`.`title`, + `c1`.`description` + FROM `content` AS `c1` WHERE `c1`.`provider_id` IS NULL AND NOT EXISTS ( + SELECT NULL FROM `content` AS `c2` + WHERE `c2`.`channel_item_id` = `c1`.`channel_item_id` + AND `c2`.`provider_id` = ? LIMIT 1 + )", + (provider_id,), + ) + } + + pub fn insert_content( + &mut self, + channel_item_id: u64, + provider_id: Option, + title: &str, + description: &str, + ) -> Result { + self.tx.exec_drop( + "INSERT INTO `content` SET `channel_item_id` = ?, + `provider_id` = ?, + `title` = ?, + `description` = ?", + (channel_item_id, provider_id, title, description), + )?; + Ok(self.tx.last_insert_id().unwrap()) + } + + pub fn insert_content_image(&mut self, content_id: u64, image_id: u64) -> Result { + self.tx.exec_drop( + "INSERT INTO `content_image` SET `content_id` = ?, `image_id` = ?", + (content_id, image_id), + )?; + Ok(self.tx.last_insert_id().unwrap()) + } + + pub fn images_total_by_source(&mut self, source: &str) -> Result { + Ok(self + .tx + .exec_first("SELECT COUNT(*) FROM `image` WHERE `source` = ?", (source,))? + .unwrap_or(0)) + } + + pub fn insert_image(&mut self, source: &str, data: &[u8]) -> Result { + self.tx.exec_drop( + "INSERT INTO `image` SET `source` = ?, `data` = ?", + (source, data), + )?; + Ok(self.tx.last_insert_id().unwrap()) + } + + pub fn provider_id_by_name(&mut self, name: &str) -> Result, Error> { + self.tx.exec_first( + "SELECT `provider_id` FROM `provider` WHERE `name` = ?", + (name,), + ) + } + + pub fn insert_provider(&mut self, name: &str) -> Result { + self.tx + .exec_drop("INSERT INTO `provider` SET `name` = ?", (name,))?; + Ok(self.tx.last_insert_id().unwrap()) + } +}