import json import sqlite3 import time from contextlib import contextmanager from pathlib import Path from typing import Iterator from backend.app.config import ( ARTICLE_FETCH_TIMEOUT, CRON_HOURS, DB_PATH, LLM_MODEL, LLM_TIMEOUT_SECONDS, MAX_ARTICLE_LENGTH, MIN_CRON_HOURS, OLLAMA_API_TIMEOUT_SECONDS, OLLAMA_HOST, SYNC_COOLDOWN_MINUTES, logger, update_constants_from_db, ) class DatabaseManager: """ Manages database connections and operations for the application. Provides methods for initializing the database, executing queries, and managing transactions. """ def __init__(self, db_path: Path): """ Initialize the database manager with the given database path. Args: db_path: Path to the SQLite database file """ self.db_path = db_path self._initialize_db() def _initialize_db(self) -> None: """ Initialize the database by creating tables if they don't exist. Also seeds initial feeds from seed_feeds.json and settings from global constants. After initialization, updates global constants with values from the database. """ try: schema_file = Path(__file__).parent / "schema.sql" if not schema_file.exists(): logger.error("❌ schema.sql not found") raise FileNotFoundError("schema.sql not found") with open(schema_file, 'r', encoding='utf-8') as f: schema_sql = f.read() with self.get_cursor() as cursor: statements = [stmt.strip() for stmt in schema_sql.split(';') if stmt.strip()] for statement in statements: cursor.execute(statement) cursor.execute("SELECT COUNT(*) FROM feeds") feed_count = cursor.fetchone()[0] if feed_count == 0: self._seed_feeds(cursor) cursor.execute("SELECT COUNT(*) FROM settings") settings_count = cursor.fetchone()[0] if settings_count == 0: self._seed_settings(cursor) settings = self.get_all_settings() update_constants_from_db(settings) except Exception as e: logger.error(f"❌ Failed to initialize database: {e}") raise def get_all_settings(self) -> dict: """ Retrieve all settings from the database. Returns: Dictionary of settings with key-value pairs """ settings = {} try: with self.get_cursor(readonly=True) as cursor: cursor.execute("SELECT key, val FROM settings") for row in cursor.fetchall(): settings[row['key']] = row['val'] return settings except Exception as e: logger.error(f"❌ Failed to retrieve settings from database: {e}") return {} def _seed_feeds(self, cursor) -> None: """ Seed initial feeds from seed_feeds.json file. """ try: seed_file = Path(__file__).parent / "seed_feeds.json" if not seed_file.exists(): logger.warning( "⚠️ seed_feeds.json not found, skipping feed seeding") return with open(seed_file, 'r', encoding='utf-8') as f: feeds_data = json.load(f) for country, urls in feeds_data.items(): for url in urls: cursor.execute( "INSERT OR IGNORE INTO feeds (country, url) VALUES (?, ?)", (country, url)) except Exception as e: logger.error(f"❌ Failed to seed feeds: {e}") def _seed_settings(self, cursor) -> None: """ Seed initial settings from global constants. """ try: settings_data = { 'ollama_host': OLLAMA_HOST, 'min_cron_hours': MIN_CRON_HOURS, 'cron_hours': CRON_HOURS, 'sync_cooldown_minutes': SYNC_COOLDOWN_MINUTES, 'llm_model': LLM_MODEL, 'llm_timeout_seconds': LLM_TIMEOUT_SECONDS, 'ollama_api_timeout_seconds': OLLAMA_API_TIMEOUT_SECONDS, 'article_fetch_timeout': ARTICLE_FETCH_TIMEOUT, 'max_article_length': MAX_ARTICLE_LENGTH } for key, val in settings_data.items(): cursor.execute( "INSERT OR IGNORE INTO settings (key, val) VALUES (?, ?)", (key, str(val)) ) except Exception as e: logger.error(f"❌ Failed to seed settings: {e}") def _get_connection(self) -> sqlite3.Connection: """ Create a thread-safe database connection. Returns: An active SQLite connection """ conn = sqlite3.connect( self.db_path, check_same_thread=False, timeout=30.0 ) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA busy_timeout=30000") conn.execute("PRAGMA synchronous=NORMAL") conn.execute("PRAGMA temp_store=MEMORY") return conn @contextmanager def get_cursor(self, readonly: bool = False) -> Iterator[sqlite3.Cursor]: """ Context manager that provides a database cursor and handles commits and rollbacks. Args: readonly: If True, opens connection in readonly mode for better concurrency Yields: A database cursor for executing SQL statements """ conn = None cursor = None try: conn = self._get_connection() cursor = conn.cursor() if readonly: conn.execute("BEGIN DEFERRED") else: conn.execute("BEGIN IMMEDIATE") yield cursor if not readonly: conn.commit() else: # For readonly transactions, we still need to end the transaction conn.commit() except sqlite3.OperationalError as e: if conn: try: conn.rollback() except: pass if "database is locked" in str(e).lower(): logger.warning( f"⚠️ Database temporarily locked, operation may need retry: {e}") raise e except Exception as e: if conn: try: conn.rollback() except: pass raise e finally: if cursor: try: cursor.close() except: pass if conn: try: conn.close() except: pass @contextmanager def get_cursor_with_retry(self, readonly: bool = False, max_retries: int = 3) -> Iterator[sqlite3.Cursor]: """ Context manager with retry logic for database operations. Args: readonly: If True, opens connection in readonly mode max_retries: Maximum number of retry attempts Yields: A database cursor for executing SQL statements """ for attempt in range(max_retries + 1): try: with self.get_cursor(readonly=readonly) as cursor: yield cursor return except sqlite3.OperationalError as e: if "database is locked" in str( e).lower() and attempt < max_retries: wait_time = (attempt + 1) * 0.1 logger.warning( f"⚠️ Database locked, retrying in {wait_time}s " f"(attempt {attempt + 1}/{max_retries + 1})" ) time.sleep(wait_time) continue raise e db_manager = DatabaseManager(DB_PATH) async def get_db(): """ Dependency that provides a database cursor with retry logic. Yields: A database cursor for executing SQL statements """ with db_manager.get_cursor_with_retry(readonly=True) as cursor: yield cursor async def get_db_write(): """ Dependency that provides a database cursor for write operations with retry logic. Yields: A database cursor for executing SQL statements """ with db_manager.get_cursor_with_retry(readonly=False) as cursor: yield cursor