diff --git a/backend/.gitignore b/backend/.gitignore index 364a7af..676e80f 100644 --- a/backend/.gitignore +++ b/backend/.gitignore @@ -56,3 +56,5 @@ logs/ *.swo /owlynews.sqlite-shm /owlynews.sqlite-wal +/owlynews.sqlite3-shm +/owlynews.sqlite3-wal diff --git a/backend/app/config.py b/backend/app/config.py new file mode 100644 index 0000000..4863f1b --- /dev/null +++ b/backend/app/config.py @@ -0,0 +1,110 @@ +from pathlib import Path +import os +import logging + +DB_PATH = Path(os.getenv("DB_NAME", "owlynews.sqlite3")) +OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://localhost:11434") +MIN_CRON_HOURS = float(os.getenv("MIN_CRON_HOURS", 0.5)) +DEFAULT_CRON_HOURS = float(os.getenv("CRON_HOURS", MIN_CRON_HOURS)) +CRON_HOURS = max(MIN_CRON_HOURS, DEFAULT_CRON_HOURS) +SYNC_COOLDOWN_MINUTES = int(os.getenv("SYNC_COOLDOWN_MINUTES", 30)) +LLM_MODEL = os.getenv("LLM_MODEL", "qwen2:7b-instruct-q4_K_M") +LLM_TIMEOUT_SECONDS = int(os.getenv("LLM_TIMEOUT_SECONDS", 180)) +OLLAMA_API_TIMEOUT_SECONDS = int(os.getenv("OLLAMA_API_TIMEOUT_SECONDS", 10)) +ARTICLE_FETCH_TIMEOUT = int(os.getenv("ARTICLE_FETCH_TIMEOUT", 30)) +MAX_ARTICLE_LENGTH = int(os.getenv("MAX_ARTICLE_LENGTH", 5000)) + +frontend_path = os.path.join( + os.path.dirname(os.path.dirname(os.path.dirname(__file__))), + "frontend", + "dist" +) + +logging.basicConfig( + level=logging.WARNING, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +def update_constants_from_db(settings_dict): + """ + Update global constants with values from the database settings. + Environment variables take precedence over database settings. + + Args: + settings_dict: Dictionary of settings from the database + """ + global OLLAMA_HOST, MIN_CRON_HOURS, CRON_HOURS, SYNC_COOLDOWN_MINUTES + global LLM_MODEL, LLM_TIMEOUT_SECONDS, OLLAMA_API_TIMEOUT_SECONDS + global ARTICLE_FETCH_TIMEOUT, MAX_ARTICLE_LENGTH + + if 'ollama_host' in settings_dict and os.getenv("OLLAMA_HOST") is None: + OLLAMA_HOST = settings_dict['ollama_host'] + + if 'min_cron_hours' in settings_dict and os.getenv("MIN_CRON_HOURS") is None: + try: + MIN_CRON_HOURS = float(settings_dict['min_cron_hours']) + except (ValueError, TypeError): + logger.warning( + f"⚠️ Invalid min_cron_hours value in DB: " + f"{settings_dict['min_cron_hours']}" + ) + + if 'cron_hours' in settings_dict and os.getenv("CRON_HOURS") is None: + try: + cron_hours_value = float(settings_dict['cron_hours']) + CRON_HOURS = max(MIN_CRON_HOURS, cron_hours_value) + except (ValueError, TypeError): + logger.warning( + f"⚠️ Invalid cron_hours value in DB: " + f"{settings_dict['cron_hours']}" + ) + + if 'sync_cooldown_minutes' in settings_dict and os.getenv("SYNC_COOLDOWN_MINUTES") is None: + try: + SYNC_COOLDOWN_MINUTES = int(settings_dict['sync_cooldown_minutes']) + except (ValueError, TypeError): + logger.warning( + f"⚠️ Invalid sync_cooldown_minutes value in DB: " + f"{settings_dict['sync_cooldown_minutes']}" + ) + + if 'llm_model' in settings_dict and os.getenv("LLM_MODEL") is None: + LLM_MODEL = settings_dict['llm_model'] + + if 'llm_timeout_seconds' in settings_dict and os.getenv("LLM_TIMEOUT_SECONDS") is None: + try: + LLM_TIMEOUT_SECONDS = int(settings_dict['llm_timeout_seconds']) + except (ValueError, TypeError): + logger.warning( + f"⚠️ Invalid llm_timeout_seconds value in DB: " + f"{settings_dict['llm_timeout_seconds']}" + ) + + if 'ollama_api_timeout_seconds' in settings_dict and os.getenv("OLLAMA_API_TIMEOUT_SECONDS") is None: + try: + OLLAMA_API_TIMEOUT_SECONDS = int(settings_dict['ollama_api_timeout_seconds']) + except (ValueError, TypeError): + logger.warning( + f"⚠️ Invalid ollama_api_timeout_seconds value in DB: " + f"{settings_dict['ollama_api_timeout_seconds']}" + ) + + if 'article_fetch_timeout' in settings_dict and os.getenv("ARTICLE_FETCH_TIMEOUT") is None: + try: + ARTICLE_FETCH_TIMEOUT = int(settings_dict['article_fetch_timeout']) + except (ValueError, TypeError): + logger.warning( + f"⚠️ Invalid article_fetch_timeout value in DB: " + f"{settings_dict['article_fetch_timeout']}" + ) + + if 'max_article_length' in settings_dict and os.getenv("MAX_ARTICLE_LENGTH") is None: + try: + MAX_ARTICLE_LENGTH = int(settings_dict['max_article_length']) + except (ValueError, TypeError): + logger.warning( + f"⚠️ Invalid max_article_length value in DB: " + f"{settings_dict['max_article_length']}" + ) diff --git a/backend/app/database.py b/backend/app/database.py new file mode 100644 index 0000000..fd503b2 --- /dev/null +++ b/backend/app/database.py @@ -0,0 +1,248 @@ +from contextlib import contextmanager +from pathlib import Path +import sqlite3 +from typing import Iterator + +from backend.app.config import logger, DB_PATH, update_constants_from_db, OLLAMA_HOST, CRON_HOURS, MIN_CRON_HOURS, \ + SYNC_COOLDOWN_MINUTES, LLM_MODEL, LLM_TIMEOUT_SECONDS, OLLAMA_API_TIMEOUT_SECONDS, ARTICLE_FETCH_TIMEOUT, \ + MAX_ARTICLE_LENGTH + + +class DatabaseManager: + """ + Manages database connections and operations for the application. + Provides methods for initializing the database, executing queries, + and managing transactions. + """ + + def __init__(self, db_path: Path): + """ + Initialize the database manager with the given database path. + + Args: + db_path: Path to the SQLite database file + """ + self.db_path = db_path + self._initialize_db() + + def _initialize_db(self) -> None: + """ + Initialize the database by creating tables if they don't exist. + Also seeds initial feeds from seed_feeds.json and settings from global constants. + After initialization, updates global constants with values from the database. + """ + try: + schema_file = Path(__file__).parent / "schema.sql" + if not schema_file.exists(): + logger.error("❌ schema.sql not found") + raise FileNotFoundError("schema.sql not found") + + with open(schema_file, 'r', encoding='utf-8') as f: + schema_sql = f.read() + + with self.get_cursor() as cursor: + statements = [stmt.strip() for stmt in schema_sql.split(';') if stmt.strip()] + for statement in statements: + cursor.execute(statement) + + cursor.execute("SELECT COUNT(*) FROM feeds") + feed_count = cursor.fetchone()[0] + + if feed_count == 0: + self._seed_feeds(cursor) + + cursor.execute("SELECT COUNT(*) FROM settings") + settings_count = cursor.fetchone()[0] + + if settings_count == 0: + self._seed_settings(cursor) + + settings = self.get_all_settings() + update_constants_from_db(settings) + except Exception as e: + logger.error(f"❌ Failed to initialize database: {e}") + raise + + def get_all_settings(self) -> dict: + """ + Retrieve all settings from the database. + + Returns: + Dictionary of settings with key-value pairs + """ + settings = {} + try: + with self.get_cursor(readonly=True) as cursor: + cursor.execute("SELECT key, val FROM settings") + for row in cursor.fetchall(): + settings[row['key']] = row['val'] + return settings + except Exception as e: + logger.error(f"❌ Failed to retrieve settings from database: {e}") + return {} + + def _seed_feeds(self, cursor) -> None: + """ + Seed initial feeds from seed_feeds.json file. + """ + import json + from pathlib import Path + + try: + seed_file = Path(__file__).parent / "seed_feeds.json" + + if not seed_file.exists(): + logger.warning("⚠️ seed_feeds.json not found, skipping feed seeding") + return + + with open(seed_file, 'r', encoding='utf-8') as f: + feeds_data = json.load(f) + + for country, urls in feeds_data.items(): + for url in urls: + cursor.execute( + "INSERT OR IGNORE INTO feeds (country, url) VALUES (?, ?)", + (country, url) + ) + + + except Exception as e: + logger.error(f"❌ Failed to seed feeds: {e}") + + def _seed_settings(self, cursor) -> None: + """ + Seed initial settings from global constants. + """ + try: + settings_data = { + 'ollama_host': OLLAMA_HOST, + 'min_cron_hours': MIN_CRON_HOURS, + 'cron_hours': CRON_HOURS, + 'sync_cooldown_minutes': SYNC_COOLDOWN_MINUTES, + 'llm_model': LLM_MODEL, + 'llm_timeout_seconds': LLM_TIMEOUT_SECONDS, + 'ollama_api_timeout_seconds': OLLAMA_API_TIMEOUT_SECONDS, + 'article_fetch_timeout': ARTICLE_FETCH_TIMEOUT, + 'max_article_length': MAX_ARTICLE_LENGTH + } + + for key, val in settings_data.items(): + cursor.execute( + "INSERT OR IGNORE INTO settings (key, val) VALUES (?, ?)", + (key, str(val)) + ) + + except Exception as e: + logger.error(f"❌ Failed to seed settings: {e}") + + def _get_connection(self) -> sqlite3.Connection: + """ + Create a thread-safe database connection. + + Returns: + An active SQLite connection + """ + conn = sqlite3.connect( + self.db_path, + check_same_thread=False, + timeout=30.0 + ) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA busy_timeout=30000") + conn.execute("PRAGMA synchronous=NORMAL") + conn.execute("PRAGMA temp_store=MEMORY") + return conn + + @contextmanager + def get_cursor(self, readonly: bool = False) -> Iterator[sqlite3.Cursor]: + """ + Context manager that provides a database cursor and handles commits and rollbacks. + + Args: + readonly: If True, opens connection in readonly mode for better concurrency + + Yields: + A database cursor for executing SQL statements + """ + conn = None + try: + conn = self._get_connection() + + if readonly: + conn.execute("BEGIN DEFERRED") + + cursor = conn.cursor() + yield cursor + + if not readonly: + conn.commit() + except sqlite3.OperationalError as e: + if conn: + conn.rollback() + if "database is locked" in str(e).lower(): + logger.warning( + f"⚠️ Database temporarily locked, operation may need retry: {e}" + ) + raise e + except Exception as e: + if conn: + conn.rollback() + raise e + finally: + if conn: + conn.close() + + @contextmanager + def get_cursor_with_retry(self, readonly: bool = False, max_retries: int = 3) -> Iterator[sqlite3.Cursor]: + """ + Context manager with retry logic for database operations. + + Args: + readonly: If True, opens connection in readonly mode + max_retries: Maximum number of retry attempts + + Yields: + A database cursor for executing SQL statements + """ + for attempt in range(max_retries + 1): + try: + with self.get_cursor(readonly=readonly) as cursor: + yield cursor + return + except sqlite3.OperationalError as e: + if "database is locked" in str(e).lower() and attempt < max_retries: + wait_time = (attempt + 1) * 0.1 + logger.warning( + f"⚠️ Database locked, retrying in {wait_time}s " + f"(attempt {attempt + 1}/{max_retries + 1})" + ) + import time + time.sleep(wait_time) + continue + raise e + + +db_manager = DatabaseManager(DB_PATH) + + +async def get_db(): + """ + Dependency that provides a database cursor with retry logic. + + Yields: + A database cursor for executing SQL statements + """ + with db_manager.get_cursor_with_retry(readonly=True) as cursor: + yield cursor + + +async def get_db_write(): + """ + Dependency that provides a database cursor for write operations with retry logic. + + Yields: + A database cursor for executing SQL statements + """ + with db_manager.get_cursor_with_retry(readonly=False) as cursor: + yield cursor diff --git a/backend/app/main.py b/backend/app/main.py index 9537ad7..c0a5c71 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -10,126 +10,25 @@ The application uses SQLite for data storage and APScheduler for scheduling peri # Standard library imports import asyncio -import json -import logging import os -import re import sqlite3 -from contextlib import contextmanager from datetime import datetime, timedelta, timezone from http.client import HTTPException -from pathlib import Path -from typing import Any, Dict, Iterator, List, Optional, Tuple, TypedDict, Union, cast +from typing import Any, Dict, List, Union # Third-party imports -import feedparser import httpx from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger -from bs4 import BeautifulSoup from fastapi import Depends, FastAPI, Response, status from fastapi.staticfiles import StaticFiles -from pydantic import BaseModel -DB_PATH = Path(os.getenv("DB_NAME", "owlynews.sqlite3")) -OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://localhost:11434") -MIN_CRON_HOURS = float(os.getenv("MIN_CRON_HOURS", 0.5)) -DEFAULT_CRON_HOURS = float(os.getenv("CRON_HOURS", MIN_CRON_HOURS)) -CRON_HOURS = max(MIN_CRON_HOURS, DEFAULT_CRON_HOURS) -SYNC_COOLDOWN_MINUTES = int(os.getenv("SYNC_COOLDOWN_MINUTES", 30)) -LLM_MODEL = os.getenv("LLM_MODEL", "qwen2:7b-instruct-q4_K_M") -LLM_TIMEOUT_SECONDS = int(os.getenv("LLM_TIMEOUT_SECONDS", 180)) -OLLAMA_API_TIMEOUT_SECONDS = int(os.getenv("OLLAMA_API_TIMEOUT_SECONDS", 10)) -ARTICLE_FETCH_TIMEOUT = int(os.getenv("ARTICLE_FETCH_TIMEOUT", 30)) -MAX_ARTICLE_LENGTH = int(os.getenv("MAX_ARTICLE_LENGTH", 5000)) - -def update_constants_from_db(settings_dict): - """ - Update global constants with values from the database settings. - Environment variables take precedence over database settings. - - Args: - settings_dict: Dictionary of settings from the database - """ - global OLLAMA_HOST, MIN_CRON_HOURS, CRON_HOURS, SYNC_COOLDOWN_MINUTES - global LLM_MODEL, LLM_TIMEOUT_SECONDS, OLLAMA_API_TIMEOUT_SECONDS - global ARTICLE_FETCH_TIMEOUT, MAX_ARTICLE_LENGTH - - if 'ollama_host' in settings_dict and os.getenv("OLLAMA_HOST") is None: - OLLAMA_HOST = settings_dict['ollama_host'] - - if 'min_cron_hours' in settings_dict and os.getenv("MIN_CRON_HOURS") is None: - try: - MIN_CRON_HOURS = float(settings_dict['min_cron_hours']) - except (ValueError, TypeError): - logger.warning( - f"⚠️ Invalid min_cron_hours value in DB: " - f"{settings_dict['min_cron_hours']}" - ) - - if 'cron_hours' in settings_dict and os.getenv("CRON_HOURS") is None: - try: - cron_hours_value = float(settings_dict['cron_hours']) - CRON_HOURS = max(MIN_CRON_HOURS, cron_hours_value) - except (ValueError, TypeError): - logger.warning( - f"⚠️ Invalid cron_hours value in DB: " - f"{settings_dict['cron_hours']}" - ) - - if 'sync_cooldown_minutes' in settings_dict and os.getenv("SYNC_COOLDOWN_MINUTES") is None: - try: - SYNC_COOLDOWN_MINUTES = int(settings_dict['sync_cooldown_minutes']) - except (ValueError, TypeError): - logger.warning( - f"⚠️ Invalid sync_cooldown_minutes value in DB: " - f"{settings_dict['sync_cooldown_minutes']}" - ) - - if 'llm_model' in settings_dict and os.getenv("LLM_MODEL") is None: - LLM_MODEL = settings_dict['llm_model'] - - if 'llm_timeout_seconds' in settings_dict and os.getenv("LLM_TIMEOUT_SECONDS") is None: - try: - LLM_TIMEOUT_SECONDS = int(settings_dict['llm_timeout_seconds']) - except (ValueError, TypeError): - logger.warning( - f"⚠️ Invalid llm_timeout_seconds value in DB: " - f"{settings_dict['llm_timeout_seconds']}" - ) - - if 'ollama_api_timeout_seconds' in settings_dict and os.getenv("OLLAMA_API_TIMEOUT_SECONDS") is None: - try: - OLLAMA_API_TIMEOUT_SECONDS = int(settings_dict['ollama_api_timeout_seconds']) - except (ValueError, TypeError): - logger.warning( - f"⚠️ Invalid ollama_api_timeout_seconds value in DB: " - f"{settings_dict['ollama_api_timeout_seconds']}" - ) - - if 'article_fetch_timeout' in settings_dict and os.getenv("ARTICLE_FETCH_TIMEOUT") is None: - try: - ARTICLE_FETCH_TIMEOUT = int(settings_dict['article_fetch_timeout']) - except (ValueError, TypeError): - logger.warning( - f"⚠️ Invalid article_fetch_timeout value in DB: " - f"{settings_dict['article_fetch_timeout']}" - ) - - if 'max_article_length' in settings_dict and os.getenv("MAX_ARTICLE_LENGTH") is None: - try: - MAX_ARTICLE_LENGTH = int(settings_dict['max_article_length']) - except (ValueError, TypeError): - logger.warning( - f"⚠️ Invalid max_article_length value in DB: " - f"{settings_dict['max_article_length']}" - ) - -logging.basicConfig( - level=logging.WARNING, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' -) -logger = logging.getLogger(__name__) +from backend.app.config import logger, OLLAMA_HOST, CRON_HOURS, MIN_CRON_HOURS, \ + SYNC_COOLDOWN_MINUTES, LLM_MODEL, OLLAMA_API_TIMEOUT_SECONDS, frontend_path +from backend.app.database import get_db, get_db_write +from backend.app.models import TimestampResponse, SuccessResponse, FeedData, ModelStatus, ErrorResponse, HoursResponse, \ + CronSettings +from backend.app.services import NewsFetcher app = FastAPI( title="Owly News Summariser", @@ -137,597 +36,6 @@ app = FastAPI( version="1.0.0" ) - -class DatabaseManager: - """ - Manages database connections and operations for the application. - Provides methods for initializing the database, executing queries, - and managing transactions. - """ - - def __init__(self, db_path: Path): - """ - Initialize the database manager with the given database path. - - Args: - db_path: Path to the SQLite database file - """ - self.db_path = db_path - self._initialize_db() - - def _initialize_db(self) -> None: - """ - Initialize the database by creating tables if they don't exist. - Also seeds initial feeds from seed_feeds.json and settings from global constants. - After initialization, updates global constants with values from the database. - """ - try: - schema_file = Path(__file__).parent / "schema.sql" - if not schema_file.exists(): - logger.error("❌ schema.sql not found") - raise FileNotFoundError("schema.sql not found") - - with open(schema_file, 'r', encoding='utf-8') as f: - schema_sql = f.read() - - with self.get_cursor() as cursor: - statements = [stmt.strip() for stmt in schema_sql.split(';') if stmt.strip()] - for statement in statements: - cursor.execute(statement) - - cursor.execute("SELECT COUNT(*) FROM feeds") - feed_count = cursor.fetchone()[0] - - if feed_count == 0: - self._seed_feeds(cursor) - - cursor.execute("SELECT COUNT(*) FROM settings") - settings_count = cursor.fetchone()[0] - - if settings_count == 0: - self._seed_settings(cursor) - - settings = self.get_all_settings() - update_constants_from_db(settings) - except Exception as e: - logger.error(f"❌ Failed to initialize database: {e}") - raise - - def get_all_settings(self) -> dict: - """ - Retrieve all settings from the database. - - Returns: - Dictionary of settings with key-value pairs - """ - settings = {} - try: - with self.get_cursor(readonly=True) as cursor: - cursor.execute("SELECT key, val FROM settings") - for row in cursor.fetchall(): - settings[row['key']] = row['val'] - return settings - except Exception as e: - logger.error(f"❌ Failed to retrieve settings from database: {e}") - return {} - - def _seed_feeds(self, cursor) -> None: - """ - Seed initial feeds from seed_feeds.json file. - """ - import json - from pathlib import Path - - try: - seed_file = Path(__file__).parent / "seed_feeds.json" - - if not seed_file.exists(): - logger.warning("⚠️ seed_feeds.json not found, skipping feed seeding") - return - - with open(seed_file, 'r', encoding='utf-8') as f: - feeds_data = json.load(f) - - for country, urls in feeds_data.items(): - for url in urls: - cursor.execute( - "INSERT OR IGNORE INTO feeds (country, url) VALUES (?, ?)", - (country, url) - ) - - - except Exception as e: - logger.error(f"❌ Failed to seed feeds: {e}") - - def _seed_settings(self, cursor) -> None: - """ - Seed initial settings from global constants. - """ - try: - settings_data = { - 'ollama_host': OLLAMA_HOST, - 'min_cron_hours': MIN_CRON_HOURS, - 'cron_hours': CRON_HOURS, - 'sync_cooldown_minutes': SYNC_COOLDOWN_MINUTES, - 'llm_model': LLM_MODEL, - 'llm_timeout_seconds': LLM_TIMEOUT_SECONDS, - 'ollama_api_timeout_seconds': OLLAMA_API_TIMEOUT_SECONDS, - 'article_fetch_timeout': ARTICLE_FETCH_TIMEOUT, - 'max_article_length': MAX_ARTICLE_LENGTH - } - - for key, val in settings_data.items(): - cursor.execute( - "INSERT OR IGNORE INTO settings (key, val) VALUES (?, ?)", - (key, str(val)) - ) - - except Exception as e: - logger.error(f"❌ Failed to seed settings: {e}") - - def _get_connection(self) -> sqlite3.Connection: - """ - Create a thread-safe database connection. - - Returns: - An active SQLite connection - """ - conn = sqlite3.connect( - self.db_path, - check_same_thread=False, - timeout=30.0 - ) - conn.row_factory = sqlite3.Row - conn.execute("PRAGMA journal_mode=WAL") - conn.execute("PRAGMA busy_timeout=30000") - conn.execute("PRAGMA synchronous=NORMAL") - conn.execute("PRAGMA temp_store=MEMORY") - return conn - - @contextmanager - def get_cursor(self, readonly: bool = False) -> Iterator[sqlite3.Cursor]: - """ - Context manager that provides a database cursor and handles commits and rollbacks. - - Args: - readonly: If True, opens connection in readonly mode for better concurrency - - Yields: - A database cursor for executing SQL statements - """ - conn = None - try: - conn = self._get_connection() - - if readonly: - conn.execute("BEGIN DEFERRED") - - cursor = conn.cursor() - yield cursor - - if not readonly: - conn.commit() - except sqlite3.OperationalError as e: - if conn: - conn.rollback() - if "database is locked" in str(e).lower(): - logger.warning( - f"⚠️ Database temporarily locked, operation may need retry: {e}" - ) - raise e - except Exception as e: - if conn: - conn.rollback() - raise e - finally: - if conn: - conn.close() - - @contextmanager - def get_cursor_with_retry(self, readonly: bool = False, max_retries: int = 3) -> Iterator[sqlite3.Cursor]: - """ - Context manager with retry logic for database operations. - - Args: - readonly: If True, opens connection in readonly mode - max_retries: Maximum number of retry attempts - - Yields: - A database cursor for executing SQL statements - """ - for attempt in range(max_retries + 1): - try: - with self.get_cursor(readonly=readonly) as cursor: - yield cursor - return - except sqlite3.OperationalError as e: - if "database is locked" in str(e).lower() and attempt < max_retries: - wait_time = (attempt + 1) * 0.1 - logger.warning( - f"⚠️ Database locked, retrying in {wait_time}s " - f"(attempt {attempt + 1}/{max_retries + 1})" - ) - import time - time.sleep(wait_time) - continue - raise e - - -db_manager = DatabaseManager(DB_PATH) - - -class ArticleSummary(TypedDict): - """Type definition for article summary data returned from the LLM.""" - title: str - summary_de: str - summary_en: str - - -class NewsFetcher: - """ - Handles fetching and summarizing news articles from RSS feeds. - Uses Ollama/qwen to generate summaries of articles. - """ - - @staticmethod - async def fetch_article_content(client: httpx.AsyncClient, url: str) -> str: - """ - Fetch and extract the main content from an article URL. - - Args: - client: An active httpx AsyncClient for making requests - url: URL of the article to fetch - - Returns: - Extracted text content from the article, or empty string if failed - """ - try: - headers = { - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' - 'AppleWebKit/537.36 (KHTML, like Gecko) ' - 'Chrome/91.0.4472.124 Safari/537.36' - } - - response = await client.get( - url, - headers=headers, - timeout=ARTICLE_FETCH_TIMEOUT, - follow_redirects=True - ) - - response.raise_for_status() - - soup = BeautifulSoup(response.text, 'html.parser') - - for element in soup(['script', 'style', 'nav', 'header', 'footer', 'aside', 'form', 'button']): - element.decompose() - - content_selectors = [ - 'article', - '[role="main"]', - '.content', - '.article-content', - '.post-content', - '.entry-content', - '.main-content', - 'main', - '.story-body', - '.article-body' - ] - - article_text = "" - - for selector in content_selectors: - elements = soup.select(selector) - if elements: - for element in elements: - text = element.get_text(separator=' ', strip=True) - if len(text) > len(article_text): - article_text = text - break - - # Fallback: get text from body if no specific content area found - if not article_text: - body = soup.find('body') - if body: - article_text = body.get_text(separator=' ', strip=True) - - article_text = re.sub(r'\s+', ' ', article_text) # Normalize whitespace - article_text = article_text.strip() - - # Limit length to avoid overwhelming the LLM - if len(article_text) > MAX_ARTICLE_LENGTH: - article_text = article_text[:MAX_ARTICLE_LENGTH] + "..." - - return article_text - - except httpx.TimeoutException: - logger.warning(f"⏰ Timeout fetching article content from: {url}") - return "" - except httpx.HTTPError as e: - logger.warning(f"🌐 HTTP error fetching article content from {url}: {e}") - return "" - except Exception as e: - logger.warning(f"❌ Error fetching article content from {url}: {type(e).__name__}: {e}") - return "" - - @staticmethod - def build_prompt(url: str, title: str = "", description: str = "", content: str = "") -> str: - """ - Generate a prompt for the LLM to summarize an article. - - Args: - url: Public URL of the article to summarize - title: Article title from RSS feed (optional) - description: Article description from RSS feed (optional) - content: Extracted article content (optional) - - Returns: - A formatted prompt string that instructs the LLM to generate - a JSON response with title and summaries in German and English - """ - context_info = [] - if title: - context_info.append(f"RSS-Titel: {title}") - if description: - context_info.append(f"RSS-Beschreibung: {description}") - if content: - content_preview = content[:500] + "..." if len(content) > 500 else content - context_info.append(f"Artikel-Inhalt: {content_preview}") - - context = "\n".join(context_info) if context_info else "Keine zusätzlichen Informationen verfügbar." - - return ( - "### Aufgabe\n" - f"Du sollst eine Nachricht basierend auf der URL und den verfügbaren Informationen zusammenfassen.\n" - f"URL: {url}\n" - f"Verfügbare Informationen:\n{context}\n\n" - "### Regeln\n" - "1. Nutze VORRANGIG den Artikel-Inhalt falls verfügbar, ergänze mit RSS-Informationen\n" - "2. Falls kein Artikel-Inhalt verfügbar ist, nutze RSS-Titel und -Beschreibung\n" - "3. Falls keine ausreichenden Informationen vorliegen, erstelle eine plausible Zusammenfassung basierend auf der URL\n" - "4. Gib ausschließlich **gültiges minifiziertes JSON** zurück – kein Markdown, keine Kommentare\n" - "5. Struktur: {\"title\":\"…\",\"description\":\"…\"}\n" - "6. title: Aussagekräftiger deutscher Titel (max 100 Zeichen)\n" - "7. description: Deutsche Zusammenfassung (zwischen 100 und 160 Wörter)\n" - "8. Kein Text vor oder nach dem JSON\n\n" - "### Ausgabe\n" - "Jetzt antworte mit dem JSON:" - ) - - @staticmethod - async def summarize_article( - client: httpx.AsyncClient, - url: str, - title: str = "", - description: str = "" - ) -> Optional[ArticleSummary]: - """ - Generate a summary of an article using the LLM. - Now fetches the actual article content for more accurate summaries. - - Args: - client: An active httpx AsyncClient for making requests - url: URL of the article to summarize - title: Article title from RSS feed - description: Article description from RSS feed - - Returns: - A dictionary containing the article title and summaries in German and English, - or None if summarization failed - """ - article_content = await NewsFetcher.fetch_article_content(client, url) - - if not article_content: - logger.warning(f"⚠️ Could not fetch article content, using RSS data only") - - prompt = NewsFetcher.build_prompt(url, title, description, article_content) - payload = { - "model": LLM_MODEL, - "prompt": prompt, - "stream": False, - "temperature": 0.1, - "format": "json" - } - - try: - response = await client.post( - f"{OLLAMA_HOST}/api/generate", - json=payload, - timeout=LLM_TIMEOUT_SECONDS - ) - - response.raise_for_status() - result = response.json() - llm_response = result["response"] - - if isinstance(llm_response, str): - summary_data = json.loads(llm_response) - else: - summary_data = llm_response - - # Validate required fields - required_fields = ["title", "description"] - missing_fields = [field for field in required_fields if field not in summary_data] - - if missing_fields: - logger.warning( - f"⚠️ Missing required fields in summary: {missing_fields}" - ) - return None - - # Check summary quality metrics - description = len(summary_data.get("description", "").split()) - - if description > 160 or description < 100: - logger.warning( - f"⚠️ Summary exceeds word limit - " - f"Description: {description}/160" - ) - - return cast(ArticleSummary, summary_data) - - except json.JSONDecodeError as e: - logger.error(f"❌ JSON parsing error for {url}: {e}") - logger.error( - f"🔍 Raw response that failed to parse: {llm_response[:500]}..." - ) - return None - except httpx.HTTPError as e: - logger.error(f"❌ HTTP error for {url}: {e}") - return None - except Exception as e: - logger.error(f"❌ Unexpected error summarizing {url}: {type(e).__name__}: {e}") - return None - - @staticmethod - async def harvest_feeds() -> None: - """ - Fetch articles from all feeds and store summaries in the database. - This is the main function that runs periodically to update the news database. - """ - - total_articles = 0 - successful_articles = 0 - failed_articles = 0 - - try: - with db_manager.get_cursor() as cursor: - cursor.execute("SELECT country, url FROM feeds") - feeds = cursor.fetchall() - - async with httpx.AsyncClient() as client: - for i, feed_row in enumerate(feeds, 1): - feed_stats = await NewsFetcher._process_feed(client, feed_row) - - total_articles += feed_stats['total'] - successful_articles += feed_stats['successful'] - failed_articles += feed_stats['failed'] - - current_time = int(datetime.now(timezone.utc).timestamp()) - with db_manager.get_cursor() as cursor: - cursor.execute( - "UPDATE meta SET val=? WHERE key='last_sync'", - (str(current_time),) - ) - - except Exception as e: - logger.error(f"❌ Critical error during harvest: {type(e).__name__}: {e}") - raise - - @staticmethod - async def _process_feed( - client: httpx.AsyncClient, - feed_row: sqlite3.Row - ) -> Dict[str, int]: - """ - Process a single feed, fetching and summarizing all articles. - Now saves summaries immediately to the database with better concurrency. - - Args: - client: An active httpx AsyncClient for making requests - feed_row: A database row containing feed information - - Returns: - Dictionary with processing statistics - """ - stats = {'total': 0, 'successful': 0, 'failed': 0, 'skipped': 0} - - try: - feed_data = feedparser.parse(feed_row["url"]) - - if hasattr(feed_data, 'bozo') and feed_data.bozo: - logger.warning(f"⚠️ Feed has parsing issues: {feed_row['url']}") - if hasattr(feed_data, 'bozo_exception'): - logger.warning(f"⚠️ Feed exception: {feed_data.bozo_exception}") - - total_entries = len(feed_data.entries) - - if total_entries == 0: - logger.warning(f"⚠️ No entries found in feed: {feed_row['url']}") - return stats - - for i, entry in enumerate(feed_data.entries, 1): - stats['total'] += 1 - - if not hasattr(entry, "link"): - stats['skipped'] += 1 - continue - - if not hasattr(entry, "published_parsed"): - stats['skipped'] += 1 - continue - - article_url = entry.link - - try: - published = datetime( - *entry.published_parsed[:6], - tzinfo=timezone.utc - ) - except (TypeError, ValueError): - stats['skipped'] += 1 - continue - - # Check if article already exists - use readonly connection for better concurrency - try: - with db_manager.get_cursor_with_retry(readonly=True) as cursor: - cursor.execute("SELECT id FROM news WHERE url = ?", (article_url,)) - if cursor.fetchone(): - stats['skipped'] += 1 - continue - except Exception as db_error: - logger.warning(f"⚠️ Database check failed for article {i}, continuing: {db_error}") - - rss_title = getattr(entry, 'title', '') - rss_description = getattr(entry, 'description', '') or getattr(entry, 'summary', '') - - summary = await NewsFetcher.summarize_article( - client, - article_url, - title=rss_title, - description=rss_description - ) - - if not summary: - logger.warning(f"❌ Failed to get summary for article {i}: {article_url}") - stats['failed'] += 1 - continue - - published_timestamp = int(published.timestamp()) - - try: - with db_manager.get_cursor_with_retry(readonly=False) as cursor: - cursor.execute( - """ - INSERT OR IGNORE INTO news - (title, description, url, published, country) - VALUES (?, ?, ?, ?, ?) - """, - ( - summary["title"], - summary["description"], - article_url, - published_timestamp, - feed_row["country"], - ) - ) - - stats['successful'] += 1 - - except Exception as db_error: - logger.error(f"❌ Database error for article {i}: {db_error}") - stats['failed'] += 1 - continue - - await asyncio.sleep(0.01) # 10ms delay to yield control - - except Exception as e: - logger.error(f"❌ Error processing feed {feed_row['url']}: {type(e).__name__}: {e}") - - return stats - - scheduler = AsyncIOScheduler(timezone="UTC") scheduler.add_job( NewsFetcher.harvest_feeds, @@ -738,74 +46,13 @@ scheduler.add_job( scheduler.start() -# Pydantic models for API requests and responses -class CronSettings(BaseModel): - """Settings for the cron job that harvests news.""" - hours: float - - -class FeedData(BaseModel): - """Data for a news feed.""" - country: str - url: str - - -class ModelStatus(BaseModel): - """Status of the LLM model.""" - name: str - status: str - available_models: List[str] - - -class ErrorResponse(BaseModel): - """Standard error response.""" - status: str - message: str - - -class SuccessResponse(BaseModel): - """Standard success response.""" - status: str - - -class TimestampResponse(BaseModel): - """Response containing a timestamp.""" - ts: int - - -class HoursResponse(BaseModel): - """Response containing hours setting.""" - hours: float - - -async def get_db(): - """ - Dependency that provides a database cursor with retry logic. - - Yields: - A database cursor for executing SQL statements - """ - with db_manager.get_cursor_with_retry(readonly=True) as cursor: - yield cursor - -async def get_db_write(): - """ - Dependency that provides a database cursor for write operations with retry logic. - - Yields: - A database cursor for executing SQL statements - """ - with db_manager.get_cursor_with_retry(readonly=False) as cursor: - yield cursor - - # API endpoints @app.get("/news", response_model=List[Dict[str, Any]]) async def get_news( - country: str = "DE", - from_: str = "2025-07-01", - to_: str = datetime.now(timezone.utc).strftime("%Y-%m-%d"), - db: sqlite3.Cursor = Depends(get_db) + country: str = "DE", + from_: str = "2025-07-01", + to_: str = datetime.now(timezone.utc).strftime("%Y-%m-%d"), + db: sqlite3.Cursor = Depends(get_db) ): """ Get news articles filtered by country and date range. @@ -829,11 +76,11 @@ async def get_news( db.execute( """ - SELECT id, title, description, url, published, country, created_at + SELECT id, title, description, url, published, country, created_at FROM news - WHERE country = ? AND published BETWEEN ? AND ? - ORDER BY published DESC - LIMIT 1000 + WHERE country = ? + AND published BETWEEN ? AND ? + ORDER BY published DESC LIMIT 1000 """, (country, from_ts, to_ts) ) @@ -891,8 +138,8 @@ async def get_last_sync(db: sqlite3.Cursor = Depends(get_db)): @app.post("/feeds", response_model=SuccessResponse) async def add_feed( - feed: FeedData, - db: sqlite3.Cursor = Depends(get_db_write) + feed: FeedData, + db: sqlite3.Cursor = Depends(get_db_write) ): """ Add a new news feed. @@ -920,8 +167,8 @@ async def add_feed( @app.delete("/feeds", response_model=SuccessResponse) async def delete_feed( - url: str, - db: sqlite3.Cursor = Depends(get_db_write) + url: str, + db: sqlite3.Cursor = Depends(get_db_write) ): """ Delete a news feed by URL. @@ -1062,9 +309,4 @@ async def update_cron_schedule(data: CronSettings, db: sqlite3.Cursor = Depends( # Mount static frontend -frontend_path = os.path.join( - os.path.dirname(os.path.dirname(os.path.dirname(__file__))), - "frontend", - "dist" -) app.mount("/", StaticFiles(directory=frontend_path, html=True), name="static") diff --git a/backend/app/models.py b/backend/app/models.py new file mode 100644 index 0000000..984bc2f --- /dev/null +++ b/backend/app/models.py @@ -0,0 +1,50 @@ +from typing import TypedDict, List + +from pydantic import BaseModel + + +class ArticleSummary(TypedDict): + """Type definition for article summary data returned from the LLM.""" + title: str + summary_de: str + summary_en: str + + +# Pydantic models for API requests and responses +class CronSettings(BaseModel): + """Settings for the cron job that harvests news.""" + hours: float + + +class FeedData(BaseModel): + """Data for a news feed.""" + country: str + url: str + + +class ModelStatus(BaseModel): + """Status of the LLM model.""" + name: str + status: str + available_models: List[str] + + +class ErrorResponse(BaseModel): + """Standard error response.""" + status: str + message: str + + +class SuccessResponse(BaseModel): + """Standard success response.""" + status: str + + +class TimestampResponse(BaseModel): + """Response containing a timestamp.""" + ts: int + + +class HoursResponse(BaseModel): + """Response containing hours setting.""" + hours: float diff --git a/backend/app/services.py b/backend/app/services.py new file mode 100644 index 0000000..ff92f82 --- /dev/null +++ b/backend/app/services.py @@ -0,0 +1,381 @@ +import asyncio +import json +import re +import sqlite3 +from datetime import datetime, timezone +from typing import Optional, cast, Dict + +import feedparser +import httpx +from bs4 import BeautifulSoup + +from backend.app.config import ARTICLE_FETCH_TIMEOUT, MAX_ARTICLE_LENGTH, logger, LLM_MODEL, OLLAMA_HOST, \ + LLM_TIMEOUT_SECONDS +from backend.app.database import db_manager +from backend.app.models import ArticleSummary + + +class NewsFetcher: + """ + Handles fetching and summarizing news articles from RSS feeds. + Uses Ollama/qwen to generate summaries of articles. + """ + + @staticmethod + async def fetch_article_content(client: httpx.AsyncClient, url: str) -> str: + """ + Fetch and extract the main content from an article URL. + + Args: + client: An active httpx AsyncClient for making requests + url: URL of the article to fetch + + Returns: + Extracted text content from the article, or empty string if failed + """ + try: + headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' + 'AppleWebKit/537.36 (KHTML, like Gecko) ' + 'Chrome/91.0.4472.124 Safari/537.36' + } + + response = await client.get( + url, + headers=headers, + timeout=ARTICLE_FETCH_TIMEOUT, + follow_redirects=True + ) + + response.raise_for_status() + + soup = BeautifulSoup(response.text, 'html.parser') + + for element in soup(['script', 'style', 'nav', 'header', 'footer', 'aside', 'form', 'button']): + element.decompose() + + content_selectors = [ + 'article', + '[role="main"]', + '.content', + '.article-content', + '.post-content', + '.entry-content', + '.main-content', + 'main', + '.story-body', + '.article-body' + ] + + article_text = "" + + for selector in content_selectors: + elements = soup.select(selector) + if elements: + for element in elements: + text = element.get_text(separator=' ', strip=True) + if len(text) > len(article_text): + article_text = text + break + + # Fallback: get text from body if no specific content area found + if not article_text: + body = soup.find('body') + if body: + article_text = body.get_text(separator=' ', strip=True) + + article_text = re.sub(r'\s+', ' ', article_text) # Normalize whitespace + article_text = article_text.strip() + + # Limit length to avoid overwhelming the LLM + if len(article_text) > MAX_ARTICLE_LENGTH: + article_text = article_text[:MAX_ARTICLE_LENGTH] + "..." + + return article_text + + except httpx.TimeoutException: + logger.warning(f"⏰ Timeout fetching article content from: {url}") + return "" + except httpx.HTTPError as e: + logger.warning(f"🌐 HTTP error fetching article content from {url}: {e}") + return "" + except Exception as e: + logger.warning(f"❌ Error fetching article content from {url}: {type(e).__name__}: {e}") + return "" + + @staticmethod + def build_prompt(url: str, title: str = "", description: str = "", content: str = "") -> str: + """ + Generate a prompt for the LLM to summarize an article. + + Args: + url: Public URL of the article to summarize + title: Article title from RSS feed (optional) + description: Article description from RSS feed (optional) + content: Extracted article content (optional) + + Returns: + A formatted prompt string that instructs the LLM to generate + a JSON response with title and summaries in German and English + """ + context_info = [] + if title: + context_info.append(f"RSS-Titel: {title}") + if description: + context_info.append(f"RSS-Beschreibung: {description}") + if content: + content_preview = content[:500] + "..." if len(content) > 500 else content + context_info.append(f"Artikel-Inhalt: {content_preview}") + + context = "\n".join(context_info) if context_info else "Keine zusätzlichen Informationen verfügbar." + + return ( + "### Aufgabe\n" + f"Du sollst eine Nachricht basierend auf der URL und den verfügbaren Informationen zusammenfassen.\n" + f"URL: {url}\n" + f"Verfügbare Informationen:\n{context}\n\n" + "### Regeln\n" + "1. Nutze VORRANGIG den Artikel-Inhalt falls verfügbar, ergänze mit RSS-Informationen\n" + "2. Falls kein Artikel-Inhalt verfügbar ist, nutze RSS-Titel und -Beschreibung\n" + "3. Falls keine ausreichenden Informationen vorliegen, erstelle eine plausible Zusammenfassung basierend auf der URL\n" + "4. Gib ausschließlich **gültiges minifiziertes JSON** zurück – kein Markdown, keine Kommentare\n" + "5. Struktur: {\"title\":\"…\",\"description\":\"…\"}\n" + "6. title: Aussagekräftiger deutscher Titel (max 100 Zeichen)\n" + "7. description: Deutsche Zusammenfassung (zwischen 100 und 160 Wörter)\n" + "8. Kein Text vor oder nach dem JSON\n\n" + "### Ausgabe\n" + "Jetzt antworte mit dem JSON:" + ) + + @staticmethod + async def summarize_article( + client: httpx.AsyncClient, + url: str, + title: str = "", + description: str = "" + ) -> Optional[ArticleSummary]: + """ + Generate a summary of an article using the LLM. + Now fetches the actual article content for more accurate summaries. + + Args: + client: An active httpx AsyncClient for making requests + url: URL of the article to summarize + title: Article title from RSS feed + description: Article description from RSS feed + + Returns: + A dictionary containing the article title and summaries in German and English, + or None if summarization failed + """ + article_content = await NewsFetcher.fetch_article_content(client, url) + + if not article_content: + logger.warning(f"⚠️ Could not fetch article content, using RSS data only") + + prompt = NewsFetcher.build_prompt(url, title, description, article_content) + payload = { + "model": LLM_MODEL, + "prompt": prompt, + "stream": False, + "temperature": 0.1, + "format": "json" + } + + try: + response = await client.post( + f"{OLLAMA_HOST}/api/generate", + json=payload, + timeout=LLM_TIMEOUT_SECONDS + ) + + response.raise_for_status() + result = response.json() + llm_response = result["response"] + + if isinstance(llm_response, str): + summary_data = json.loads(llm_response) + else: + summary_data = llm_response + + # Validate required fields + required_fields = ["title", "description"] + missing_fields = [field for field in required_fields if field not in summary_data] + + if missing_fields: + logger.warning( + f"⚠️ Missing required fields in summary: {missing_fields}" + ) + return None + + # Check summary quality metrics + description = len(summary_data.get("description", "").split()) + + if description > 160 or description < 100: + logger.warning( + f"⚠️ Summary exceeds word limit - " + f"Description: {description}/160" + ) + + return cast(ArticleSummary, summary_data) + + except json.JSONDecodeError as e: + logger.error(f"❌ JSON parsing error for {url}: {e}") + logger.error( + f"🔍 Raw response that failed to parse: {llm_response[:500]}..." + ) + return None + except httpx.HTTPError as e: + logger.error(f"❌ HTTP error for {url}: {e}") + return None + except Exception as e: + logger.error(f"❌ Unexpected error summarizing {url}: {type(e).__name__}: {e}") + return None + + @staticmethod + async def harvest_feeds() -> None: + """ + Fetch articles from all feeds and store summaries in the database. + This is the main function that runs periodically to update the news database. + """ + + total_articles = 0 + successful_articles = 0 + failed_articles = 0 + + try: + with db_manager.get_cursor() as cursor: + cursor.execute("SELECT country, url FROM feeds") + feeds = cursor.fetchall() + + async with httpx.AsyncClient() as client: + for i, feed_row in enumerate(feeds, 1): + feed_stats = await NewsFetcher._process_feed(client, feed_row) + + total_articles += feed_stats['total'] + successful_articles += feed_stats['successful'] + failed_articles += feed_stats['failed'] + + current_time = int(datetime.now(timezone.utc).timestamp()) + with db_manager.get_cursor() as cursor: + cursor.execute( + "UPDATE meta SET val=? WHERE key='last_sync'", + (str(current_time),) + ) + + except Exception as e: + logger.error(f"❌ Critical error during harvest: {type(e).__name__}: {e}") + raise + + @staticmethod + async def _process_feed( + client: httpx.AsyncClient, + feed_row: sqlite3.Row + ) -> Dict[str, int]: + """ + Process a single feed, fetching and summarizing all articles. + Now saves summaries immediately to the database with better concurrency. + + Args: + client: An active httpx AsyncClient for making requests + feed_row: A database row containing feed information + + Returns: + Dictionary with processing statistics + """ + stats = {'total': 0, 'successful': 0, 'failed': 0, 'skipped': 0} + + try: + feed_data = feedparser.parse(feed_row["url"]) + + if hasattr(feed_data, 'bozo') and feed_data.bozo: + logger.warning(f"⚠️ Feed has parsing issues: {feed_row['url']}") + if hasattr(feed_data, 'bozo_exception'): + logger.warning(f"⚠️ Feed exception: {feed_data.bozo_exception}") + + total_entries = len(feed_data.entries) + + if total_entries == 0: + logger.warning(f"⚠️ No entries found in feed: {feed_row['url']}") + return stats + + for i, entry in enumerate(feed_data.entries, 1): + stats['total'] += 1 + + if not hasattr(entry, "link"): + stats['skipped'] += 1 + continue + + if not hasattr(entry, "published_parsed"): + stats['skipped'] += 1 + continue + + article_url = entry.link + + try: + published = datetime( + *entry.published_parsed[:6], + tzinfo=timezone.utc + ) + except (TypeError, ValueError): + stats['skipped'] += 1 + continue + + # Check if article already exists - use readonly connection for better concurrency + try: + with db_manager.get_cursor_with_retry(readonly=True) as cursor: + cursor.execute("SELECT id FROM news WHERE url = ?", (article_url,)) + if cursor.fetchone(): + stats['skipped'] += 1 + continue + except Exception as db_error: + logger.warning(f"⚠️ Database check failed for article {i}, continuing: {db_error}") + + rss_title = getattr(entry, 'title', '') + rss_description = getattr(entry, 'description', '') or getattr(entry, 'summary', '') + + summary = await NewsFetcher.summarize_article( + client, + article_url, + title=rss_title, + description=rss_description + ) + + if not summary: + logger.warning(f"❌ Failed to get summary for article {i}: {article_url}") + stats['failed'] += 1 + continue + + published_timestamp = int(published.timestamp()) + + try: + with db_manager.get_cursor_with_retry(readonly=False) as cursor: + cursor.execute( + """ + INSERT + OR IGNORE INTO news + (title, description, url, published, country) + VALUES (?, ?, ?, ?, ?) + """, + ( + summary["title"], + summary["description"], + article_url, + published_timestamp, + feed_row["country"], + ) + ) + + stats['successful'] += 1 + + except Exception as db_error: + logger.error(f"❌ Database error for article {i}: {db_error}") + stats['failed'] += 1 + continue + + await asyncio.sleep(0.01) # 10ms delay to yield control + + except Exception as e: + logger.error(f"❌ Error processing feed {feed_row['url']}: {type(e).__name__}: {e}") + + return stats