separate Pollable and Transactional features, separate table members, use single-connection transactions method in the crawler and llm crates, minor crawler optimizations such as disconnect from db server on each queue iteration complete

This commit is contained in:
yggverse 2026-01-09 22:35:06 +02:00
parent 221b43e4cf
commit f48e256fad
11 changed files with 438 additions and 409 deletions

View file

@ -14,7 +14,7 @@ anyhow = "1.0.100"
chrono = "0.4.42"
clap = { version = "4.5.54", features = ["derive"] }
log = "0.4.29"
mysql = { package = "rssto-mysql", version = "0.1.0", path = "../mysql" }
mysql = { package = "rssto-mysql", version = "0.1.0", features = ["transactional"], path = "../mysql" }
reqwest = { version = "0.13.1", features = ["blocking"] }
rss = "2.0.12"
scraper = { version = "0.25.0", features = ["serde"] }

View file

@ -3,7 +3,7 @@ mod config;
use anyhow::Result;
use log::{debug, info, warn};
use mysql::Mysql;
use mysql::Transactional;
use reqwest::blocking::get;
fn main() -> Result<()> {
@ -28,22 +28,27 @@ fn main() -> Result<()> {
let argument = Argument::parse();
let config: config::Config = toml::from_str(&read_to_string(argument.config)?)?;
let database = Mysql::connect(
&config.mysql.host,
config.mysql.port,
&config.mysql.user,
&config.mysql.password,
&config.mysql.database,
)?;
info!("Crawler started");
loop {
debug!("Begin new crawl queue...");
for c in &config.channel {
debug!("Update `{}`...", c.url);
if let Err(e) = crawl(&database, c) {
warn!("Channel `{}` update failed: `{e}`", c.url)
{
// disconnect from the database immediately when exiting this scope,
// in case the `update` queue is enabled and pending for a while.
let mut db = Transactional::connect(
&config.mysql.host,
config.mysql.port,
&config.mysql.user,
&config.mysql.password,
&config.mysql.database,
)?;
for c in &config.channel {
debug!("Update `{}`...", c.url);
if let Err(e) = crawl(&mut db, c) {
warn!("Channel `{}` update failed: `{e}`", c.url)
}
}
db.commit()?
}
debug!("Crawl queue completed");
if let Some(update) = config.update {
@ -55,7 +60,7 @@ fn main() -> Result<()> {
}
}
fn crawl(db: &Mysql, channel_config: &config::Channel) -> Result<()> {
fn crawl(db: &mut Transactional, channel_config: &config::Channel) -> Result<()> {
use rss::Channel;
use scraper::Selector;
@ -82,8 +87,8 @@ fn crawl(db: &Mysql, channel_config: &config::Channel) -> Result<()> {
let channel_items_limit = channel_config.items_limit.unwrap_or(channel_items.len());
let channel_id = match db.channels_by_url(&channel_url, Some(1))?.first() {
Some(result) => result.channel_id,
let channel_id = match db.channel_id_by_url(&channel_url)? {
Some(channel_id) => channel_id,
None => db.insert_channel(&channel_url)?,
};
@ -115,10 +120,7 @@ fn crawl(db: &Mysql, channel_config: &config::Channel) -> Result<()> {
continue;
}
};
if !db
.channel_items_by_channel_id_guid(channel_id, guid, Some(1))?
.is_empty()
{
if db.channel_items_total_by_channel_id_guid(channel_id, guid)? > 0 {
continue; // skip next steps as processed
}
let channel_item_id = db.insert_channel_item(
@ -186,10 +188,6 @@ fn crawl(db: &Mysql, channel_config: &config::Channel) -> Result<()> {
}
},
};
assert!(
db.contents_by_channel_item_id_provider_id(channel_item_id, None, Some(1))?
.is_empty()
);
let _content_id = db.insert_content(channel_item_id, None, &title, &description)?;
// @TODO preload media
}

View file

@ -11,7 +11,7 @@ use config::Config;
use feed::Feed;
use global::Global;
use meta::Meta;
use mysql::{Mysql, Sort};
use mysql::{Pollable, pollable::Sort};
use rocket::{State, http::Status, response::content::RawXml, serde::Serialize};
use rocket_dyn_templates::{Template, context};
@ -19,7 +19,7 @@ use rocket_dyn_templates::{Template, context};
fn index(
search: Option<&str>,
page: Option<usize>,
db: &State<Mysql>,
db: &State<Pollable>,
meta: &State<Meta>,
global: &State<Global>,
) -> Result<Template, Status> {
@ -92,7 +92,7 @@ fn index(
#[get("/<content_id>")]
fn info(
content_id: u64,
db: &State<Mysql>,
db: &State<Pollable>,
meta: &State<Meta>,
global: &State<Global>,
) -> Result<Template, Status> {
@ -123,7 +123,7 @@ fn rss(
search: Option<&str>,
global: &State<Global>,
meta: &State<Meta>,
db: &State<Mysql>,
db: &State<Pollable>,
) -> Result<RawXml<String>, Status> {
let mut f = Feed::new(
&meta.title,
@ -165,7 +165,7 @@ fn rocket() -> _ {
}
})
.manage(
Mysql::connect(
Pollable::connect(
&config.mysql_host,
config.mysql_port,
&config.mysql_username,

View file

@ -15,6 +15,6 @@ chrono = "0.4.42"
clap = { version = "4.5.54", features = ["derive"] }
lancor = "0.1.1"
log = "0.4.29"
mysql = { package = "rssto-mysql", version = "0.1.0", path = "../mysql" }
mysql = { package = "rssto-mysql", version = "0.1.0", features = ["transactional"], path = "../mysql" }
tokio = { version = "1.0", features = ["full"] }
tracing-subscriber = { version = "0.3.22", features = ["env-filter"] }

View file

@ -1,15 +1,16 @@
mod argument;
use anyhow::Result;
use argument::Argument;
use mysql::Transactional;
#[tokio::main]
async fn main() -> Result<()> {
use argument::Argument;
use chrono::Local;
use clap::Parser;
use lancor::{ChatCompletionRequest, LlamaCppClient, Message};
use log::{debug, info};
use mysql::{Mysql, Sort};
use std::env::var;
if var("RUST_LOG").is_ok() {
@ -27,68 +28,73 @@ async fn main() -> Result<()> {
}
let arg = Argument::parse();
let db = Mysql::connect(
&arg.mysql_host,
arg.mysql_port,
&arg.mysql_username,
&arg.mysql_password,
&arg.mysql_database,
)?;
let llm = LlamaCppClient::new(format!(
"{}://{}:{}",
arg.llm_scheme, arg.llm_host, arg.llm_port
))?;
let provider_id = match db.provider_by_name(&arg.llm_model)? {
Some(p) => {
debug!(
"Use existing DB provider #{} matches model name `{}`",
p.provider_id, &arg.llm_model
);
p.provider_id
}
None => {
let provider_id = db.insert_provider(&arg.llm_model)?;
info!(
"Provider `{}` not found in database, created new one with ID `{provider_id}`",
&arg.llm_model
);
provider_id
// find existing ID by model name or create a new one
// * this feature should be moved to a separate CLI tool
let provider_id = {
let mut db = tx(&arg)?;
match db.provider_id_by_name(&arg.llm_model)? {
Some(provider_id) => {
debug!(
"Use existing DB provider #{} matches model name `{}`",
provider_id, &arg.llm_model
);
provider_id
}
None => {
let provider_id = db.insert_provider(&arg.llm_model)?;
info!(
"Provider `{}` not found in database, created new one with ID `{provider_id}`",
&arg.llm_model
);
db.commit()?;
provider_id
}
}
};
info!("Daemon started");
loop {
debug!("New queue begin...");
for source in db.contents_queue_for_provider_id(provider_id, Sort::Asc, None)? {
debug!(
"Begin generating `content_id` #{} using `provider_id` #{provider_id}.",
source.content_id
);
{
// disconnect from the database immediately when exiting this scope,
// in case the `update` queue is enabled and pending for a while.
let mut db = tx(&arg)?;
for source in db.contents_queue_for_provider_id(provider_id)? {
debug!(
"Begin generating `content_id` #{} using `provider_id` #{provider_id}.",
source.content_id
);
let title =
llm.chat_completion(ChatCompletionRequest::new(&arg.llm_model).message(
Message::user(format!("{}\n{}", arg.llm_message, source.title)),
))
.await?;
let title = llm
.chat_completion(ChatCompletionRequest::new(&arg.llm_model).message(
Message::user(format!("{}\n{}", arg.llm_message, source.title)),
))
.await?;
let description =
llm.chat_completion(ChatCompletionRequest::new(&arg.llm_model).message(
Message::user(format!("{}\n{}", arg.llm_message, source.description)),
))
.await?;
let description = llm
.chat_completion(ChatCompletionRequest::new(&arg.llm_model).message(
Message::user(format!("{}\n{}", arg.llm_message, source.description)),
))
.await?;
let content_id = db.insert_content(
source.channel_item_id,
Some(provider_id),
&title.choices[0].message.content,
&description.choices[0].message.content,
)?;
let content_id = db.insert_content(
source.channel_item_id,
Some(provider_id),
&title.choices[0].message.content,
&description.choices[0].message.content,
)?;
debug!(
"Created `content_id` #{content_id} using `content_id` #{} source by `provider_id` #{provider_id}.",
source.content_id
)
debug!(
"Created `content_id` #{content_id} using `content_id` #{} source by `provider_id` #{provider_id}.",
source.content_id
)
}
db.commit()?
}
debug!("Queue completed");
if let Some(update) = arg.update {
@ -99,3 +105,15 @@ async fn main() -> Result<()> {
}
}
}
// in fact, there is no need for a transaction at this moment,
// as there are no related table updates, but who knows what the future holds
fn tx(arg: &Argument) -> Result<Transactional> {
Ok(Transactional::connect(
&arg.mysql_host,
arg.mysql_port,
&arg.mysql_username,
&arg.mysql_password,
&arg.mysql_database,
)?)
}

View file

@ -9,5 +9,10 @@ keywords = ["rssto", "database", "mysql", "library", "driver", "api"]
# categories = []
repository = "https://github.com/YGGverse/rssto"
[features]
default = ["pollable"]
pollable = []
transactional = []
[dependencies]
mysql = "26.0.1"
mysql = "26.0.1"

View file

@ -1,333 +1,13 @@
use mysql::{
Error, Pool,
prelude::{FromRow, Queryable},
};
#[cfg(feature = "pollable")]
pub mod pollable;
pub struct Mysql {
pool: Pool,
}
pub mod table;
impl Mysql {
pub fn connect(
host: &str,
port: u16,
user: &str,
password: &str,
database: &str,
) -> Result<Self, Error> {
Ok(Self {
pool: mysql::Pool::new(
format!("mysql://{user}:{password}@{host}:{port}/{database}").as_str(),
)?,
})
}
#[cfg(feature = "transactional")]
pub mod transactional;
pub fn channels_by_url(&self, url: &str, limit: Option<usize>) -> Result<Vec<Channel>, Error> {
self.pool.get_conn()?.exec(
format!(
"SELECT `channel_id`, `url` FROM `channel` WHERE `url` = ? LIMIT {}",
limit.unwrap_or(DEFAULT_LIMIT)
),
(url,),
)
}
#[cfg(feature = "pollable")]
pub use pollable::Pollable;
pub fn insert_channel(&self, url: &str) -> Result<u64, Error> {
let mut c = self.pool.get_conn()?;
c.exec_drop("INSERT INTO `channel` SET `url` = ?", (url,))?;
Ok(c.last_insert_id())
}
pub fn channel_item(&self, channel_item_id: u64) -> Result<Option<ChannelItem>, Error> {
self.pool.get_conn()?.exec_first(
"SELECT `channel_item_id`,
`channel_id`,
`pub_date`,
`guid`,
`link`,
`title`,
`description` FROM `channel_item` WHERE `channel_item_id` = ?",
(channel_item_id,),
)
}
pub fn channel_items_by_channel_id_guid(
&self,
channel_id: u64,
guid: &str,
limit: Option<usize>,
) -> Result<Vec<ChannelItem>, Error> {
self.pool.get_conn()?.exec(
format!(
"SELECT `channel_item_id`,
`channel_id`,
`pub_date`,
`guid`,
`link`,
`title`,
`description` FROM `channel_item` WHERE `channel_id` = ? AND `guid` = ? LIMIT {}",
limit.unwrap_or(DEFAULT_LIMIT)
),
(channel_id, guid),
)
}
pub fn insert_channel_item(
&self,
channel_id: u64,
pub_date: i64,
guid: &str,
link: &str,
title: Option<&str>,
description: Option<&str>,
) -> Result<u64, Error> {
let mut c = self.pool.get_conn()?;
c.exec_drop(
"INSERT INTO `channel_item` SET `channel_id` = ?,
`pub_date` = ?,
`guid` = ?,
`link` = ?,
`title` = ?,
`description` = ?",
(channel_id, pub_date, guid, link, title, description),
)?;
Ok(c.last_insert_id())
}
pub fn content(&self, content_id: u64) -> Result<Option<Content>, Error> {
self.pool.get_conn()?.exec_first(
"SELECT `content_id`,
`channel_item_id`,
`provider_id`,
`title`,
`description` FROM `content` WHERE `content_id` = ?",
(content_id,),
)
}
pub fn contents_total_by_provider_id(
&self,
provider_id: Option<u64>,
keyword: Option<&str>,
) -> Result<usize, Error> {
let total: Option<usize> = self.pool.get_conn()?.exec_first(
"SELECT COUNT(*) FROM `content` WHERE `provider_id` <=> ? AND `title` LIKE ?",
(provider_id, like(keyword)),
)?;
Ok(total.unwrap_or(0))
}
pub fn contents_by_provider_id(
&self,
provider_id: Option<u64>,
keyword: Option<&str>,
sort: Sort,
limit: Option<usize>,
) -> Result<Vec<Content>, Error> {
self.pool.get_conn()?.exec(format!(
"SELECT `content_id`,
`channel_item_id`,
`provider_id`,
`title`,
`description` FROM `content` WHERE `provider_id` <=> ? AND `title` LIKE ? ORDER BY `content_id` {sort} LIMIT {}",
limit.unwrap_or(DEFAULT_LIMIT)
),
(provider_id, like(keyword), ))
}
/// Get subjects for `rssto-llm` queue
pub fn contents_queue_for_provider_id(
&self,
provider_id: u64,
sort: Sort,
limit: Option<usize>,
) -> Result<Vec<Content>, Error> {
self.pool.get_conn()?.exec(
format!(
"SELECT `c1`.`content_id`,
`c1`.`channel_item_id`,
`c1`.`provider_id`,
`c1`.`title`,
`c1`.`description`
FROM `content` AS `c1` WHERE `c1`.`provider_id` IS NULL AND NOT EXISTS (
SELECT NULL FROM `content` AS `c2` WHERE `c2`.`channel_item_id` = `c1`.`channel_item_id` AND `c2`.`provider_id` = ? LIMIT 1
) ORDER BY `c1`.`content_id` {sort} LIMIT {}",
limit.unwrap_or(DEFAULT_LIMIT)
),
(provider_id,),
)
}
pub fn contents_by_channel_item_id_provider_id(
&self,
channel_item_id: u64,
provider_id: Option<u64>,
limit: Option<usize>,
) -> Result<Vec<Content>, Error> {
self.pool.get_conn()?.exec(
format!(
"SELECT `content_id`,
`channel_item_id`,
`provider_id`,
`title`,
`description` FROM `content`
WHERE `channel_item_id` = ? AND `provider_id` <=> ? LIMIT {}",
limit.unwrap_or(DEFAULT_LIMIT)
),
(channel_item_id, provider_id),
)
}
pub fn insert_content(
&self,
channel_item_id: u64,
provider_id: Option<u64>,
title: &str,
description: &str,
) -> Result<u64, Error> {
let mut c = self.pool.get_conn()?;
c.exec_drop(
"INSERT INTO `content` SET `channel_item_id` = ?,
`provider_id` = ?,
`title` = ?,
`description` = ?",
(channel_item_id, provider_id, title, description),
)?;
Ok(c.last_insert_id())
}
pub fn content_image(&self, content_image_id: u64) -> Result<Option<ContentImage>, Error> {
self.pool.get_conn()?.exec_first(
"SELECT `content_image_id`,
`content_id`,
`image_id`,
`data`,
`source` FROM `content_image`
JOIN `image` ON (`image`.`image_id` = `content_image`.`image_id`)
WHERE `content_image_id` = ? LIMIT 1",
(content_image_id,),
)
}
pub fn insert_content_image(&self, content_id: u64, image_id: u64) -> Result<u64, Error> {
let mut c = self.pool.get_conn()?;
c.exec_drop(
"INSERT INTO `content_image` SET `content_id` = ?, `image_id` = ?",
(content_id, image_id),
)?;
Ok(c.last_insert_id())
}
pub fn image_by_source(&self, source: &str) -> Result<Option<Image>, Error> {
self.pool.get_conn()?.exec_first(
"SELECT `image_id`,
`source`,
`data` FROM `image` WHERE `source` = ? LIMIT 1",
(source,),
)
}
pub fn images(&self, limit: Option<usize>) -> Result<Vec<Image>, Error> {
self.pool.get_conn()?.query(format!(
"SELECT `image_id`, `source`, `data` FROM `image` LIMIT {}",
limit.unwrap_or(DEFAULT_LIMIT)
))
}
pub fn insert_image(&self, source: &str, data: &[u8]) -> Result<u64, Error> {
let mut c = self.pool.get_conn()?;
c.exec_drop(
"INSERT INTO `image` SET `source` = ?, `data` = ?",
(source, data),
)?;
Ok(c.last_insert_id())
}
pub fn provider_by_name(&self, name: &str) -> Result<Option<Provider>, Error> {
self.pool.get_conn()?.exec_first(
"SELECT `provider_id`,
`name`
FROM `provider` WHERE `name` = ?",
(name,),
)
}
pub fn insert_provider(&self, name: &str) -> Result<u64, Error> {
let mut c = self.pool.get_conn()?;
c.exec_drop("INSERT INTO `provider` SET `name` = ?", (name,))?;
Ok(c.last_insert_id())
}
}
#[derive(Debug, PartialEq, Eq, FromRow)]
pub struct Channel {
pub channel_id: u64,
pub url: String,
}
#[derive(Debug, PartialEq, Eq, FromRow)]
pub struct ChannelItem {
pub channel_item_id: u64,
pub channel_id: u64,
pub pub_date: i64,
pub guid: String,
pub link: String,
pub title: Option<String>,
pub description: Option<String>,
}
#[derive(Debug, PartialEq, Eq, FromRow)]
pub struct Content {
pub content_id: u64,
pub channel_item_id: u64,
/// None if the original `title` and `description` values
/// parsed from the channel item on crawl
pub provider_id: Option<u64>,
pub title: String,
pub description: String,
}
#[derive(Debug, PartialEq, Eq, FromRow)]
pub struct Provider {
pub provider_id: u64,
pub name: String,
}
#[derive(Debug, PartialEq, Eq, FromRow)]
pub struct Image {
pub image_id: u64,
pub source: String,
pub data: Vec<u8>,
}
/// Includes joined `image` table members
#[derive(Debug, PartialEq, Eq, FromRow)]
pub struct ContentImage {
pub content_image_id: u64,
pub content_id: u64,
pub image_id: u64,
// Image members (JOIN)
pub data: Vec<u8>,
pub source: String,
}
pub enum Sort {
Asc,
Desc,
}
impl std::fmt::Display for Sort {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Self::Asc => write!(f, "ASC"),
Self::Desc => write!(f, "DESC"),
}
}
}
/// Shared search logic
fn like(value: Option<&str>) -> String {
value.map_or("%".into(), |k| format!("{k}%"))
}
const DEFAULT_LIMIT: usize = 100;
#[cfg(feature = "transactional")]
pub use transactional::Transactional;

View file

@ -0,0 +1,114 @@
pub mod sort;
pub use sort::Sort;
use crate::table::*;
use mysql::{Error, Pool, prelude::Queryable};
/// Safe, read-only operations used in client apps like `rssto-http`
pub struct Pollable {
pool: Pool,
}
impl Pollable {
pub fn connect(
host: &str,
port: u16,
user: &str,
password: &str,
database: &str,
) -> Result<Self, Error> {
Ok(Self {
pool: mysql::Pool::new(
format!("mysql://{user}:{password}@{host}:{port}/{database}").as_str(),
)?,
})
}
pub fn channel_item(&self, channel_item_id: u64) -> Result<Option<ChannelItem>, Error> {
self.pool.get_conn()?.exec_first(
"SELECT `channel_item_id`,
`channel_id`,
`pub_date`,
`guid`,
`link`,
`title`,
`description` FROM `channel_item` WHERE `channel_item_id` = ?",
(channel_item_id,),
)
}
pub fn content(&self, content_id: u64) -> Result<Option<Content>, Error> {
self.pool.get_conn()?.exec_first(
"SELECT `content_id`,
`channel_item_id`,
`provider_id`,
`title`,
`description` FROM `content` WHERE `content_id` = ?",
(content_id,),
)
}
pub fn contents_total_by_provider_id(
&self,
provider_id: Option<u64>,
keyword: Option<&str>,
) -> Result<usize, Error> {
let total: Option<usize> = self.pool.get_conn()?.exec_first(
"SELECT COUNT(*) FROM `content` WHERE `provider_id` <=> ? AND `title` LIKE ?",
(provider_id, like(keyword)),
)?;
Ok(total.unwrap_or(0))
}
pub fn contents_by_provider_id(
&self,
provider_id: Option<u64>,
keyword: Option<&str>,
sort: Sort,
limit: Option<usize>,
) -> Result<Vec<Content>, Error> {
self.pool.get_conn()?.exec(format!(
"SELECT `content_id`,
`channel_item_id`,
`provider_id`,
`title`,
`description` FROM `content` WHERE `provider_id` <=> ? AND `title` LIKE ? ORDER BY `content_id` {sort} LIMIT {}",
limit.unwrap_or(DEFAULT_LIMIT)
),
(provider_id, like(keyword), ))
}
pub fn content_image(&self, content_image_id: u64) -> Result<Option<ContentImage>, Error> {
self.pool.get_conn()?.exec_first(
"SELECT `content_image_id`,
`content_id`,
`image_id`,
`data`,
`source` FROM `content_image`
JOIN `image` ON (`image`.`image_id` = `content_image`.`image_id`)
WHERE `content_image_id` = ? LIMIT 1",
(content_image_id,),
)
}
pub fn images(&self, limit: Option<usize>) -> Result<Vec<Image>, Error> {
self.pool.get_conn()?.query(format!(
"SELECT `image_id`, `source`, `data` FROM `image` LIMIT {}",
limit.unwrap_or(DEFAULT_LIMIT)
))
}
pub fn insert_provider(&self, name: &str) -> Result<u64, Error> {
let mut c = self.pool.get_conn()?;
c.exec_drop("INSERT INTO `provider` SET `name` = ?", (name,))?;
Ok(c.last_insert_id())
}
}
/// Shared search logic
fn like(value: Option<&str>) -> String {
value.map_or("%".into(), |k| format!("{k}%"))
}
const DEFAULT_LIMIT: usize = 100;

View file

@ -0,0 +1,13 @@
pub enum Sort {
Asc,
Desc,
}
impl std::fmt::Display for Sort {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Self::Asc => write!(f, "ASC"),
Self::Desc => write!(f, "DESC"),
}
}
}

53
crates/mysql/src/table.rs Normal file
View file

@ -0,0 +1,53 @@
use mysql::prelude::FromRow;
#[derive(Debug, PartialEq, Eq, FromRow)]
pub struct Channel {
pub channel_id: u64,
pub url: String,
}
#[derive(Debug, PartialEq, Eq, FromRow)]
pub struct ChannelItem {
pub channel_item_id: u64,
pub channel_id: u64,
pub pub_date: i64,
pub guid: String,
pub link: String,
pub title: Option<String>,
pub description: Option<String>,
}
#[derive(Debug, PartialEq, Eq, FromRow)]
pub struct Content {
pub content_id: u64,
pub channel_item_id: u64,
/// None if the original `title` and `description` values
/// parsed from the channel item on crawl
pub provider_id: Option<u64>,
pub title: String,
pub description: String,
}
#[derive(Debug, PartialEq, Eq, FromRow)]
pub struct Provider {
pub provider_id: u64,
pub name: String,
}
#[derive(Debug, PartialEq, Eq, FromRow)]
pub struct Image {
pub image_id: u64,
pub source: String,
pub data: Vec<u8>,
}
/// Includes joined `image` table members
#[derive(Debug, PartialEq, Eq, FromRow)]
pub struct ContentImage {
pub content_image_id: u64,
pub content_id: u64,
pub image_id: u64,
// Image members (JOIN)
pub data: Vec<u8>,
pub source: String,
}

View file

@ -0,0 +1,148 @@
use crate::table::*;
use mysql::{Error, Pool, Transaction, TxOpts, prelude::Queryable};
/// Safe, optimized read/write operations
/// mostly required by the `rssto-crawler` and `rssto-llm`
/// * all members implementation requires `commit` action
pub struct Transactional {
tx: Transaction<'static>,
}
impl Transactional {
pub fn connect(
host: &str,
port: u16,
user: &str,
password: &str,
database: &str,
) -> Result<Self, Error> {
Ok(Self {
tx: Pool::new(format!("mysql://{user}:{password}@{host}:{port}/{database}").as_str())?
.start_transaction(TxOpts::default())?,
})
}
pub fn commit(self) -> Result<(), Error> {
self.tx.commit()
}
pub fn channel_id_by_url(&mut self, url: &str) -> Result<Option<u64>, Error> {
self.tx.exec_first(
"SELECT `channel_id` FROM `channel` WHERE `url` = ? LIMIT 1",
(url,),
)
}
pub fn insert_channel(&mut self, url: &str) -> Result<u64, Error> {
self.tx
.exec_drop("INSERT INTO `channel` SET `url` = ?", (url,))?;
Ok(self.tx.last_insert_id().unwrap())
}
pub fn channel_items_total_by_channel_id_guid(
&mut self,
channel_id: u64,
guid: &str,
) -> Result<usize, Error> {
Ok(self
.tx
.exec_first(
"SELECT COUNT(*) FROM `channel_item` WHERE `channel_id` = ? AND `guid` = ?",
(channel_id, guid),
)?
.unwrap_or(0))
}
pub fn insert_channel_item(
&mut self,
channel_id: u64,
pub_date: i64,
guid: &str,
link: &str,
title: Option<&str>,
description: Option<&str>,
) -> Result<u64, Error> {
self.tx.exec_drop(
"INSERT INTO `channel_item` SET `channel_id` = ?,
`pub_date` = ?,
`guid` = ?,
`link` = ?,
`title` = ?,
`description` = ?",
(channel_id, pub_date, guid, link, title, description),
)?;
Ok(self.tx.last_insert_id().unwrap())
}
pub fn contents_queue_for_provider_id(
&mut self,
provider_id: u64,
) -> Result<Vec<Content>, Error> {
self.tx.exec(
"SELECT `c1`.`content_id`,
`c1`.`channel_item_id`,
`c1`.`provider_id`,
`c1`.`title`,
`c1`.`description`
FROM `content` AS `c1` WHERE `c1`.`provider_id` IS NULL AND NOT EXISTS (
SELECT NULL FROM `content` AS `c2`
WHERE `c2`.`channel_item_id` = `c1`.`channel_item_id`
AND `c2`.`provider_id` = ? LIMIT 1
)",
(provider_id,),
)
}
pub fn insert_content(
&mut self,
channel_item_id: u64,
provider_id: Option<u64>,
title: &str,
description: &str,
) -> Result<u64, Error> {
self.tx.exec_drop(
"INSERT INTO `content` SET `channel_item_id` = ?,
`provider_id` = ?,
`title` = ?,
`description` = ?",
(channel_item_id, provider_id, title, description),
)?;
Ok(self.tx.last_insert_id().unwrap())
}
pub fn insert_content_image(&mut self, content_id: u64, image_id: u64) -> Result<u64, Error> {
self.tx.exec_drop(
"INSERT INTO `content_image` SET `content_id` = ?, `image_id` = ?",
(content_id, image_id),
)?;
Ok(self.tx.last_insert_id().unwrap())
}
pub fn images_total_by_source(&mut self, source: &str) -> Result<usize, Error> {
Ok(self
.tx
.exec_first("SELECT COUNT(*) FROM `image` WHERE `source` = ?", (source,))?
.unwrap_or(0))
}
pub fn insert_image(&mut self, source: &str, data: &[u8]) -> Result<u64, Error> {
self.tx.exec_drop(
"INSERT INTO `image` SET `source` = ?, `data` = ?",
(source, data),
)?;
Ok(self.tx.last_insert_id().unwrap())
}
pub fn provider_id_by_name(&mut self, name: &str) -> Result<Option<u64>, Error> {
self.tx.exec_first(
"SELECT `provider_id` FROM `provider` WHERE `name` = ?",
(name,),
)
}
pub fn insert_provider(&mut self, name: &str) -> Result<u64, Error> {
self.tx
.exec_drop("INSERT INTO `provider` SET `name` = ?", (name,))?;
Ok(self.tx.last_insert_id().unwrap())
}
}