added database migration and initialization logic to backend, including migration loader and async migration runner
This commit is contained in:
13
backend-rust/src/db.rs
Normal file
13
backend-rust/src/db.rs
Normal file
@@ -0,0 +1,13 @@
|
||||
use anyhow::Result;
|
||||
use std::path::{Path};
|
||||
use tokio_rusqlite::Connection as AsyncConn;
|
||||
use crate::migrations::Migrator;
|
||||
|
||||
pub async fn initialize_db(db_path: &Path, migrations_dir: &Path) -> Result<AsyncConn> {
|
||||
let conn = AsyncConn::open(db_path).await?;
|
||||
let migrator = Migrator::new(migrations_dir.to_path_buf())?;
|
||||
|
||||
migrator.migrate_up_async(&conn).await?;
|
||||
|
||||
Ok(conn)
|
||||
}
|
22
backend-rust/src/main.rs
Normal file
22
backend-rust/src/main.rs
Normal file
@@ -0,0 +1,22 @@
|
||||
use std::path::Path;
|
||||
|
||||
mod db;
|
||||
mod migrations;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let migrations_folder = String::from("src/migrations");
|
||||
|
||||
let db_path = Path::new("owlynews.sqlite3");
|
||||
let migrations_path = Path::new(&migrations_folder);
|
||||
|
||||
match db::initialize_db(&db_path, migrations_path).await {
|
||||
Ok(_conn) => {
|
||||
println!("Database initialized successfully");
|
||||
// Logic goes here
|
||||
}
|
||||
Err(e) => {
|
||||
println!("Error initializing database: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
245
backend-rust/src/migrations.rs
Normal file
245
backend-rust/src/migrations.rs
Normal file
@@ -0,0 +1,245 @@
|
||||
use anyhow::{Context, Result};
|
||||
use rusqlite::{Connection, params};
|
||||
use std::collections::HashSet;
|
||||
use std::path::PathBuf;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
use tokio::fs;
|
||||
use tokio_rusqlite::Connection as AsyncConn;
|
||||
|
||||
pub struct Migration {
|
||||
pub version: i64,
|
||||
pub name: String,
|
||||
pub sql_up: String,
|
||||
#[allow(dead_code)]
|
||||
pub sql_down: String,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Migrator {
|
||||
migrations_dir: PathBuf,
|
||||
}
|
||||
|
||||
impl Migrator {
|
||||
pub fn new(migrations_dir: PathBuf) -> Result<Self> {
|
||||
Ok(Migrator { migrations_dir })
|
||||
}
|
||||
|
||||
fn initialize(&self, conn: &mut Connection) -> Result<()> {
|
||||
let tx = conn
|
||||
.transaction()
|
||||
.context("Failed to start transaction for initialization")?;
|
||||
|
||||
tx.execute(
|
||||
"CREATE TABLE IF NOT EXISTS migrations (version INTEGER PRIMARY KEY)",
|
||||
[],
|
||||
)
|
||||
.context("Failed to create migrations table")?;
|
||||
|
||||
let columns: HashSet<String> = {
|
||||
let mut stmt = tx.prepare("PRAGMA table_info(migrations)")?;
|
||||
stmt.query_map([], |row| row.get(1))?
|
||||
.collect::<Result<HashSet<String>, _>>()?
|
||||
};
|
||||
|
||||
if !columns.contains("name") {
|
||||
tx.execute("ALTER TABLE migrations ADD COLUMN name TEXT", [])
|
||||
.context("Failed to add 'name' column to migrations table")?;
|
||||
}
|
||||
if !columns.contains("applied_at") {
|
||||
tx.execute("ALTER TABLE migrations ADD COLUMN applied_at INTEGER", [])
|
||||
.context("Failed to add 'applied_at' column to migrations table")?;
|
||||
}
|
||||
|
||||
tx.commit()
|
||||
.context("Failed to commit migrations table initialization")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn load_migrations_async(&self) -> Result<Vec<Migration>> {
|
||||
let mut migrations = Vec::new();
|
||||
|
||||
// Use async-aware try_exists
|
||||
if !fs::try_exists(&self.migrations_dir).await? {
|
||||
return Ok(migrations);
|
||||
}
|
||||
|
||||
let mut entries = fs::read_dir(&self.migrations_dir)
|
||||
.await
|
||||
.context("Failed to read migrations directory")?;
|
||||
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
let path = entry.path();
|
||||
|
||||
if path.is_file() && path.extension().unwrap_or_default() == "sql" {
|
||||
let file_name = path.file_stem().unwrap().to_string_lossy();
|
||||
|
||||
// Format should be: VERSION_NAME.sql (e.g. 001_create_users.sql
|
||||
if let Some((version_str, name)) = file_name.split_once('_') {
|
||||
if let Ok(version) = version_str.parse::<i64>() {
|
||||
let content = fs::read_to_string(&path).await.with_context(|| {
|
||||
format!("Failed to read migration file: {}", path.display())
|
||||
})?;
|
||||
|
||||
// Split content into up and down migrations if they exist
|
||||
let parts: Vec<&str> = content.split("-- DOWN").collect();
|
||||
let sql_up = parts[0].trim().to_string();
|
||||
let sql_down = parts.get(1).map_or(String::new(), |s| s.trim().to_string());
|
||||
|
||||
migrations.push(Migration {
|
||||
version,
|
||||
name: name.to_string(),
|
||||
sql_up,
|
||||
sql_down,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
migrations.sort_by_key(|m| m.version);
|
||||
Ok(migrations)
|
||||
}
|
||||
|
||||
pub fn get_applied_migrations(&self, conn: &mut Connection) -> Result<HashSet<i64>> {
|
||||
let mut stmt = conn
|
||||
.prepare("SELECT version FROM migrations ORDER BY version")
|
||||
.context("Failed to prepare query for applied migrations")?;
|
||||
let versions = stmt
|
||||
.query_map([], |row| row.get(0))?
|
||||
.collect::<Result<HashSet<i64>, _>>()?;
|
||||
Ok(versions)
|
||||
}
|
||||
|
||||
pub async fn migrate_up_async(&self, async_conn: &AsyncConn) -> Result<()> {
|
||||
let migrations = self.load_migrations_async().await?;
|
||||
let migrator = self.clone();
|
||||
|
||||
// Perform all database operations within a blocking-safe context
|
||||
async_conn
|
||||
.call(move |conn| {
|
||||
migrator.initialize(conn).expect("TODO: panic message");
|
||||
let applied = migrator
|
||||
.get_applied_migrations(conn)
|
||||
.expect("TODO: panic message");
|
||||
|
||||
let tx = conn
|
||||
.transaction()
|
||||
.context("Failed to start transaction for migrations")
|
||||
.expect("TODO: panic message");
|
||||
|
||||
for migration in migrations {
|
||||
if !applied.contains(&migration.version) {
|
||||
println!(
|
||||
"Applying migration {}: {}",
|
||||
migration.version, migration.name
|
||||
);
|
||||
|
||||
tx.execute_batch(&migration.sql_up)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to execute migration {}: {}",
|
||||
migration.version, migration.name
|
||||
)
|
||||
})
|
||||
.expect("TODO: panic message");
|
||||
|
||||
let now = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.expect("TODO: panic message")
|
||||
.as_secs() as i64;
|
||||
tx.execute(
|
||||
"INSERT INTO migrations (version, name, applied_at) VALUES (?, ?, ?)",
|
||||
params![migration.version, migration.name, now],
|
||||
)
|
||||
.with_context(|| {
|
||||
format!("Failed to record migration {}", migration.version)
|
||||
})
|
||||
.expect("TODO: panic message");
|
||||
}
|
||||
}
|
||||
|
||||
tx.commit()
|
||||
.context("Failed to commit migrations")
|
||||
.expect("TODO: panic message");
|
||||
Ok(())
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub async fn migrate_down_async(
|
||||
&self,
|
||||
async_conn: &AsyncConn,
|
||||
target_version: Option<i64>,
|
||||
) -> Result<()> {
|
||||
let migrations = self.load_migrations_async().await?;
|
||||
let migrator = self.clone();
|
||||
|
||||
// Perform all database operations within a blocking-safe context
|
||||
async_conn
|
||||
.call(move |conn| {
|
||||
migrator.initialize(conn).expect("TODO: panic message");
|
||||
let applied = migrator
|
||||
.get_applied_migrations(conn)
|
||||
.expect("TODO: panic message");
|
||||
|
||||
// If no target specified, roll back only the latest migration
|
||||
let max_applied = *applied.iter().max().unwrap_or(&0);
|
||||
let target =
|
||||
target_version.unwrap_or(if max_applied > 0 { max_applied - 1 } else { 0 });
|
||||
|
||||
let tx = conn
|
||||
.transaction()
|
||||
.context("Failed to start transaction for migrations")
|
||||
.expect("TODO: panic message");
|
||||
|
||||
// Find migrations to roll back (in reverse order)
|
||||
let mut to_rollback: Vec<&Migration> = migrations
|
||||
.iter()
|
||||
.filter(|m| applied.contains(&m.version) && m.version > target)
|
||||
.collect();
|
||||
|
||||
to_rollback.sort_by_key(|m| std::cmp::Reverse(m.version));
|
||||
|
||||
for migration in to_rollback {
|
||||
println!(
|
||||
"Rolling back migration {}: {}",
|
||||
migration.version, migration.name
|
||||
);
|
||||
|
||||
if !migration.sql_down.is_empty() {
|
||||
tx.execute_batch(&migration.sql_down)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to rollback migration {}: {}",
|
||||
migration.version, migration.name
|
||||
)
|
||||
})
|
||||
.expect("TODO: panic message");
|
||||
} else {
|
||||
println!("Warning: No down migration defined for {}", migration.name);
|
||||
}
|
||||
|
||||
// Remove the migration record
|
||||
tx.execute(
|
||||
"DELETE FROM migrations WHERE version = ?",
|
||||
[&migration.version],
|
||||
)
|
||||
.with_context(|| {
|
||||
format!("Failed to remove migration record {}", migration.version)
|
||||
})
|
||||
.expect("TODO: panic message");
|
||||
}
|
||||
|
||||
tx.commit()
|
||||
.context("Failed to commit rollback")
|
||||
.expect("TODO: panic message");
|
||||
Ok(())
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
45
backend-rust/src/migrations/001_initial_schema.sql
Normal file
45
backend-rust/src/migrations/001_initial_schema.sql
Normal file
@@ -0,0 +1,45 @@
|
||||
-- Initial database schema for Owly News Summariser
|
||||
|
||||
-- News table to store articles
|
||||
CREATE TABLE IF NOT EXISTS news
|
||||
(
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
title TEXT NOT NULL,
|
||||
summary TEXT,
|
||||
url TEXT NOT NULL,
|
||||
published TIMESTAMP NOT NULL,
|
||||
country TEXT NOT NULL,
|
||||
created_at INTEGER DEFAULT (strftime('%s', 'now'))
|
||||
);
|
||||
|
||||
-- Index for faster queries on published date
|
||||
CREATE INDEX IF NOT EXISTS idx_news_published ON news (published);
|
||||
|
||||
-- Feeds table to store RSS feed sources
|
||||
CREATE TABLE IF NOT EXISTS feeds
|
||||
(
|
||||
id INTEGER PRIMARY KEY,
|
||||
country TEXT,
|
||||
url TEXT UNIQUE NOT NULL
|
||||
);
|
||||
|
||||
-- Settings table for application configuration
|
||||
CREATE TABLE IF NOT EXISTS settings
|
||||
(
|
||||
key TEXT PRIMARY KEY,
|
||||
val TEXT NOT NULL
|
||||
);
|
||||
|
||||
-- Meta table for application metadata
|
||||
CREATE TABLE IF NOT EXISTS meta
|
||||
(
|
||||
key TEXT PRIMARY KEY,
|
||||
val TEXT NOT NULL
|
||||
);
|
||||
|
||||
-- DOWN
|
||||
DROP TABLE IF EXISTS meta;
|
||||
DROP TABLE IF EXISTS settings;
|
||||
DROP TABLE IF EXISTS feeds;
|
||||
DROP INDEX IF EXISTS idx_news_published;
|
||||
DROP TABLE IF EXISTS news;
|
23
backend-rust/src/migrations/002_add_category_to_news.sql
Normal file
23
backend-rust/src/migrations/002_add_category_to_news.sql
Normal file
@@ -0,0 +1,23 @@
|
||||
-- Add category field to news table
|
||||
ALTER TABLE news
|
||||
ADD COLUMN category TEXT;
|
||||
|
||||
-- DOWN
|
||||
CREATE TABLE news_backup
|
||||
(
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
title TEXT NOT NULL,
|
||||
summary TEXT,
|
||||
url TEXT NOT NULL,
|
||||
published TIMESTAMP NOT NULL,
|
||||
country TEXT NOT NULL,
|
||||
created_at INTEGER DEFAULT (strftime('%s', 'now'))
|
||||
);
|
||||
|
||||
INSERT INTO news_backup
|
||||
SELECT id, title, summary, url, published, country, created_at
|
||||
FROM news;
|
||||
DROP TABLE news;
|
||||
ALTER TABLE news_backup
|
||||
RENAME TO news;
|
||||
CREATE INDEX IF NOT EXISTS idx_news_published ON news (published);
|
Reference in New Issue
Block a user