281 lines
8.7 KiB
Python
281 lines
8.7 KiB
Python
import json
|
|
import sqlite3
|
|
import time
|
|
from contextlib import contextmanager
|
|
from pathlib import Path
|
|
from typing import Iterator
|
|
|
|
from backend.app.config import (
|
|
ARTICLE_FETCH_TIMEOUT,
|
|
CRON_HOURS,
|
|
DB_PATH,
|
|
LLM_MODEL,
|
|
LLM_TIMEOUT_SECONDS,
|
|
MAX_ARTICLE_LENGTH,
|
|
MIN_CRON_HOURS,
|
|
OLLAMA_API_TIMEOUT_SECONDS,
|
|
OLLAMA_HOST,
|
|
SYNC_COOLDOWN_MINUTES,
|
|
logger,
|
|
update_constants_from_db,
|
|
)
|
|
|
|
|
|
class DatabaseManager:
|
|
"""
|
|
Manages database connections and operations for the application.
|
|
Provides methods for initializing the database, executing queries,
|
|
and managing transactions.
|
|
"""
|
|
|
|
def __init__(self, db_path: Path):
|
|
"""
|
|
Initialize the database manager with the given database path.
|
|
|
|
Args:
|
|
db_path: Path to the SQLite database file
|
|
"""
|
|
self.db_path = db_path
|
|
self._initialize_db()
|
|
|
|
def _initialize_db(self) -> None:
|
|
"""
|
|
Initialize the database by creating tables if they don't exist.
|
|
Also seeds initial feeds from seed_feeds.json and settings from global constants.
|
|
After initialization, updates global constants with values from the database.
|
|
"""
|
|
try:
|
|
schema_file = Path(__file__).parent / "schema.sql"
|
|
if not schema_file.exists():
|
|
logger.error("❌ schema.sql not found")
|
|
raise FileNotFoundError("schema.sql not found")
|
|
|
|
with open(schema_file, 'r', encoding='utf-8') as f:
|
|
schema_sql = f.read()
|
|
|
|
with self.get_cursor() as cursor:
|
|
statements = [stmt.strip()
|
|
for stmt in schema_sql.split(';') if stmt.strip()]
|
|
for statement in statements:
|
|
cursor.execute(statement)
|
|
|
|
cursor.execute("SELECT COUNT(*) FROM feeds")
|
|
feed_count = cursor.fetchone()[0]
|
|
|
|
if feed_count == 0:
|
|
self._seed_feeds(cursor)
|
|
|
|
cursor.execute("SELECT COUNT(*) FROM settings")
|
|
settings_count = cursor.fetchone()[0]
|
|
|
|
if settings_count == 0:
|
|
self._seed_settings(cursor)
|
|
|
|
settings = self.get_all_settings()
|
|
update_constants_from_db(settings)
|
|
except Exception as e:
|
|
logger.error(f"❌ Failed to initialize database: {e}")
|
|
raise
|
|
|
|
def get_all_settings(self) -> dict:
|
|
"""
|
|
Retrieve all settings from the database.
|
|
|
|
Returns:
|
|
Dictionary of settings with key-value pairs
|
|
"""
|
|
settings = {}
|
|
try:
|
|
with self.get_cursor(readonly=True) as cursor:
|
|
cursor.execute("SELECT key, val FROM settings")
|
|
for row in cursor.fetchall():
|
|
settings[row['key']] = row['val']
|
|
return settings
|
|
except Exception as e:
|
|
logger.error(f"❌ Failed to retrieve settings from database: {e}")
|
|
return {}
|
|
|
|
def _seed_feeds(self, cursor) -> None:
|
|
"""
|
|
Seed initial feeds from seed_feeds.json file.
|
|
"""
|
|
try:
|
|
seed_file = Path(__file__).parent / "seed_feeds.json"
|
|
|
|
if not seed_file.exists():
|
|
logger.warning(
|
|
"⚠️ seed_feeds.json not found, skipping feed seeding")
|
|
return
|
|
|
|
with open(seed_file, 'r', encoding='utf-8') as f:
|
|
feeds_data = json.load(f)
|
|
|
|
for country, urls in feeds_data.items():
|
|
for url in urls:
|
|
cursor.execute(
|
|
"INSERT OR IGNORE INTO feeds (country, url) VALUES (?, ?)", (country, url))
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Failed to seed feeds: {e}")
|
|
|
|
def _seed_settings(self, cursor) -> None:
|
|
"""
|
|
Seed initial settings from global constants.
|
|
"""
|
|
try:
|
|
settings_data = {
|
|
'ollama_host': OLLAMA_HOST,
|
|
'min_cron_hours': MIN_CRON_HOURS,
|
|
'cron_hours': CRON_HOURS,
|
|
'sync_cooldown_minutes': SYNC_COOLDOWN_MINUTES,
|
|
'llm_model': LLM_MODEL,
|
|
'llm_timeout_seconds': LLM_TIMEOUT_SECONDS,
|
|
'ollama_api_timeout_seconds': OLLAMA_API_TIMEOUT_SECONDS,
|
|
'article_fetch_timeout': ARTICLE_FETCH_TIMEOUT,
|
|
'max_article_length': MAX_ARTICLE_LENGTH
|
|
}
|
|
|
|
for key, val in settings_data.items():
|
|
cursor.execute(
|
|
"INSERT OR IGNORE INTO settings (key, val) VALUES (?, ?)",
|
|
(key, str(val))
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Failed to seed settings: {e}")
|
|
|
|
def _get_connection(self) -> sqlite3.Connection:
|
|
"""
|
|
Create a thread-safe database connection.
|
|
|
|
Returns:
|
|
An active SQLite connection
|
|
"""
|
|
conn = sqlite3.connect(
|
|
self.db_path,
|
|
check_same_thread=False,
|
|
timeout=30.0
|
|
)
|
|
conn.row_factory = sqlite3.Row
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.execute("PRAGMA busy_timeout=30000")
|
|
conn.execute("PRAGMA synchronous=NORMAL")
|
|
conn.execute("PRAGMA temp_store=MEMORY")
|
|
return conn
|
|
|
|
@contextmanager
|
|
def get_cursor(self, readonly: bool = False) -> Iterator[sqlite3.Cursor]:
|
|
"""
|
|
Context manager that provides a database cursor and handles commits and rollbacks.
|
|
|
|
Args:
|
|
readonly: If True, opens connection in readonly mode for better concurrency
|
|
|
|
Yields:
|
|
A database cursor for executing SQL statements
|
|
"""
|
|
conn = None
|
|
cursor = None
|
|
try:
|
|
conn = self._get_connection()
|
|
cursor = conn.cursor()
|
|
|
|
if readonly:
|
|
conn.execute("BEGIN DEFERRED")
|
|
else:
|
|
conn.execute("BEGIN IMMEDIATE")
|
|
|
|
yield cursor
|
|
|
|
if not readonly:
|
|
conn.commit()
|
|
else:
|
|
# For readonly transactions, we still need to end the transaction
|
|
conn.commit()
|
|
|
|
except sqlite3.OperationalError as e:
|
|
if conn:
|
|
try:
|
|
conn.rollback()
|
|
except:
|
|
pass
|
|
if "database is locked" in str(e).lower():
|
|
logger.warning(
|
|
f"⚠️ Database temporarily locked, operation may need retry: {e}")
|
|
raise e
|
|
except Exception as e:
|
|
if conn:
|
|
try:
|
|
conn.rollback()
|
|
except:
|
|
pass
|
|
raise e
|
|
finally:
|
|
if cursor:
|
|
try:
|
|
cursor.close()
|
|
except:
|
|
pass
|
|
if conn:
|
|
try:
|
|
conn.close()
|
|
except:
|
|
pass
|
|
|
|
|
|
@contextmanager
|
|
def get_cursor_with_retry(self,
|
|
readonly: bool = False,
|
|
max_retries: int = 3) -> Iterator[sqlite3.Cursor]:
|
|
"""
|
|
Context manager with retry logic for database operations.
|
|
|
|
Args:
|
|
readonly: If True, opens connection in readonly mode
|
|
max_retries: Maximum number of retry attempts
|
|
|
|
Yields:
|
|
A database cursor for executing SQL statements
|
|
"""
|
|
for attempt in range(max_retries + 1):
|
|
try:
|
|
with self.get_cursor(readonly=readonly) as cursor:
|
|
yield cursor
|
|
return
|
|
except sqlite3.OperationalError as e:
|
|
if "database is locked" in str(
|
|
e).lower() and attempt < max_retries:
|
|
wait_time = (attempt + 1) * 0.1
|
|
logger.warning(
|
|
f"⚠️ Database locked, retrying in {wait_time}s "
|
|
f"(attempt {attempt + 1}/{max_retries + 1})"
|
|
)
|
|
time.sleep(wait_time)
|
|
continue
|
|
raise e
|
|
|
|
|
|
db_manager = DatabaseManager(DB_PATH)
|
|
|
|
|
|
async def get_db():
|
|
"""
|
|
Dependency that provides a database cursor with retry logic.
|
|
|
|
Yields:
|
|
A database cursor for executing SQL statements
|
|
"""
|
|
with db_manager.get_cursor_with_retry(readonly=True) as cursor:
|
|
yield cursor
|
|
|
|
|
|
async def get_db_write():
|
|
"""
|
|
Dependency that provides a database cursor for write operations with retry logic.
|
|
|
|
Yields:
|
|
A database cursor for executing SQL statements
|
|
"""
|
|
with db_manager.get_cursor_with_retry(readonly=False) as cursor:
|
|
yield cursor
|