1071 lines
35 KiB
Python
1071 lines
35 KiB
Python
"""
|
||
Owly News Summariser Backend
|
||
|
||
This module provides a FastAPI application that serves as the backend for the Owly News Summariser.
|
||
It handles fetching news from RSS feeds, summarizing articles using Ollama/qwen, and providing
|
||
an API for the frontend to access the summarized news.
|
||
|
||
The application uses SQLite for data storage and APScheduler for scheduling periodic news harvesting.
|
||
"""
|
||
|
||
# Standard library imports
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
import os
|
||
import re
|
||
import sqlite3
|
||
from contextlib import contextmanager
|
||
from datetime import datetime, timedelta, timezone
|
||
from http.client import HTTPException
|
||
from pathlib import Path
|
||
from typing import Any, Dict, Iterator, List, Optional, Tuple, TypedDict, Union, cast
|
||
|
||
# Third-party imports
|
||
import feedparser
|
||
import httpx
|
||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||
from apscheduler.triggers.interval import IntervalTrigger
|
||
from bs4 import BeautifulSoup
|
||
from fastapi import Depends, FastAPI, Response, status
|
||
from fastapi.staticfiles import StaticFiles
|
||
from pydantic import BaseModel
|
||
|
||
DB_PATH = Path(os.getenv("DB_NAME", "owlynews.sqlite3"))
|
||
OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://localhost:11434")
|
||
MIN_CRON_HOURS = float(os.getenv("MIN_CRON_HOURS", 0.5))
|
||
DEFAULT_CRON_HOURS = float(os.getenv("CRON_HOURS", MIN_CRON_HOURS))
|
||
CRON_HOURS = max(MIN_CRON_HOURS, DEFAULT_CRON_HOURS)
|
||
SYNC_COOLDOWN_MINUTES = int(os.getenv("SYNC_COOLDOWN_MINUTES", 30))
|
||
LLM_MODEL = os.getenv("LLM_MODEL", "qwen2:7b-instruct-q4_K_M")
|
||
LLM_TIMEOUT_SECONDS = int(os.getenv("LLM_TIMEOUT_SECONDS", 180))
|
||
OLLAMA_API_TIMEOUT_SECONDS = int(os.getenv("OLLAMA_API_TIMEOUT_SECONDS", 10))
|
||
ARTICLE_FETCH_TIMEOUT = int(os.getenv("ARTICLE_FETCH_TIMEOUT", 30))
|
||
MAX_ARTICLE_LENGTH = int(os.getenv("MAX_ARTICLE_LENGTH", 5000))
|
||
|
||
def update_constants_from_db(settings_dict):
|
||
"""
|
||
Update global constants with values from the database settings.
|
||
Environment variables take precedence over database settings.
|
||
|
||
Args:
|
||
settings_dict: Dictionary of settings from the database
|
||
"""
|
||
global OLLAMA_HOST, MIN_CRON_HOURS, CRON_HOURS, SYNC_COOLDOWN_MINUTES
|
||
global LLM_MODEL, LLM_TIMEOUT_SECONDS, OLLAMA_API_TIMEOUT_SECONDS
|
||
global ARTICLE_FETCH_TIMEOUT, MAX_ARTICLE_LENGTH
|
||
|
||
if 'ollama_host' in settings_dict and os.getenv("OLLAMA_HOST") is None:
|
||
OLLAMA_HOST = settings_dict['ollama_host']
|
||
|
||
if 'min_cron_hours' in settings_dict and os.getenv("MIN_CRON_HOURS") is None:
|
||
try:
|
||
MIN_CRON_HOURS = float(settings_dict['min_cron_hours'])
|
||
except (ValueError, TypeError):
|
||
logger.warning(
|
||
f"⚠️ Invalid min_cron_hours value in DB: "
|
||
f"{settings_dict['min_cron_hours']}"
|
||
)
|
||
|
||
if 'cron_hours' in settings_dict and os.getenv("CRON_HOURS") is None:
|
||
try:
|
||
cron_hours_value = float(settings_dict['cron_hours'])
|
||
CRON_HOURS = max(MIN_CRON_HOURS, cron_hours_value)
|
||
except (ValueError, TypeError):
|
||
logger.warning(
|
||
f"⚠️ Invalid cron_hours value in DB: "
|
||
f"{settings_dict['cron_hours']}"
|
||
)
|
||
|
||
if 'sync_cooldown_minutes' in settings_dict and os.getenv("SYNC_COOLDOWN_MINUTES") is None:
|
||
try:
|
||
SYNC_COOLDOWN_MINUTES = int(settings_dict['sync_cooldown_minutes'])
|
||
except (ValueError, TypeError):
|
||
logger.warning(
|
||
f"⚠️ Invalid sync_cooldown_minutes value in DB: "
|
||
f"{settings_dict['sync_cooldown_minutes']}"
|
||
)
|
||
|
||
if 'llm_model' in settings_dict and os.getenv("LLM_MODEL") is None:
|
||
LLM_MODEL = settings_dict['llm_model']
|
||
|
||
if 'llm_timeout_seconds' in settings_dict and os.getenv("LLM_TIMEOUT_SECONDS") is None:
|
||
try:
|
||
LLM_TIMEOUT_SECONDS = int(settings_dict['llm_timeout_seconds'])
|
||
except (ValueError, TypeError):
|
||
logger.warning(
|
||
f"⚠️ Invalid llm_timeout_seconds value in DB: "
|
||
f"{settings_dict['llm_timeout_seconds']}"
|
||
)
|
||
|
||
if 'ollama_api_timeout_seconds' in settings_dict and os.getenv("OLLAMA_API_TIMEOUT_SECONDS") is None:
|
||
try:
|
||
OLLAMA_API_TIMEOUT_SECONDS = int(settings_dict['ollama_api_timeout_seconds'])
|
||
except (ValueError, TypeError):
|
||
logger.warning(
|
||
f"⚠️ Invalid ollama_api_timeout_seconds value in DB: "
|
||
f"{settings_dict['ollama_api_timeout_seconds']}"
|
||
)
|
||
|
||
if 'article_fetch_timeout' in settings_dict and os.getenv("ARTICLE_FETCH_TIMEOUT") is None:
|
||
try:
|
||
ARTICLE_FETCH_TIMEOUT = int(settings_dict['article_fetch_timeout'])
|
||
except (ValueError, TypeError):
|
||
logger.warning(
|
||
f"⚠️ Invalid article_fetch_timeout value in DB: "
|
||
f"{settings_dict['article_fetch_timeout']}"
|
||
)
|
||
|
||
if 'max_article_length' in settings_dict and os.getenv("MAX_ARTICLE_LENGTH") is None:
|
||
try:
|
||
MAX_ARTICLE_LENGTH = int(settings_dict['max_article_length'])
|
||
except (ValueError, TypeError):
|
||
logger.warning(
|
||
f"⚠️ Invalid max_article_length value in DB: "
|
||
f"{settings_dict['max_article_length']}"
|
||
)
|
||
|
||
logging.basicConfig(
|
||
level=logging.WARNING,
|
||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
app = FastAPI(
|
||
title="Owly News Summariser",
|
||
description="API for the Owly News Summariser application",
|
||
version="1.0.0"
|
||
)
|
||
|
||
|
||
class DatabaseManager:
|
||
"""
|
||
Manages database connections and operations for the application.
|
||
Provides methods for initializing the database, executing queries,
|
||
and managing transactions.
|
||
"""
|
||
|
||
def __init__(self, db_path: Path):
|
||
"""
|
||
Initialize the database manager with the given database path.
|
||
|
||
Args:
|
||
db_path: Path to the SQLite database file
|
||
"""
|
||
self.db_path = db_path
|
||
self._initialize_db()
|
||
|
||
def _initialize_db(self) -> None:
|
||
"""
|
||
Initialize the database by creating tables if they don't exist.
|
||
Also seeds initial feeds from seed_feeds.json and settings from global constants.
|
||
After initialization, updates global constants with values from the database.
|
||
"""
|
||
try:
|
||
schema_file = Path(__file__).parent / "schema.sql"
|
||
if not schema_file.exists():
|
||
logger.error("❌ schema.sql not found")
|
||
raise FileNotFoundError("schema.sql not found")
|
||
|
||
with open(schema_file, 'r', encoding='utf-8') as f:
|
||
schema_sql = f.read()
|
||
|
||
with self.get_cursor() as cursor:
|
||
statements = [stmt.strip() for stmt in schema_sql.split(';') if stmt.strip()]
|
||
for statement in statements:
|
||
cursor.execute(statement)
|
||
|
||
cursor.execute("SELECT COUNT(*) FROM feeds")
|
||
feed_count = cursor.fetchone()[0]
|
||
|
||
if feed_count == 0:
|
||
self._seed_feeds(cursor)
|
||
|
||
cursor.execute("SELECT COUNT(*) FROM settings")
|
||
settings_count = cursor.fetchone()[0]
|
||
|
||
if settings_count == 0:
|
||
self._seed_settings(cursor)
|
||
|
||
settings = self.get_all_settings()
|
||
update_constants_from_db(settings)
|
||
except Exception as e:
|
||
logger.error(f"❌ Failed to initialize database: {e}")
|
||
raise
|
||
|
||
def get_all_settings(self) -> dict:
|
||
"""
|
||
Retrieve all settings from the database.
|
||
|
||
Returns:
|
||
Dictionary of settings with key-value pairs
|
||
"""
|
||
settings = {}
|
||
try:
|
||
with self.get_cursor(readonly=True) as cursor:
|
||
cursor.execute("SELECT key, val FROM settings")
|
||
for row in cursor.fetchall():
|
||
settings[row['key']] = row['val']
|
||
return settings
|
||
except Exception as e:
|
||
logger.error(f"❌ Failed to retrieve settings from database: {e}")
|
||
return {}
|
||
|
||
def _seed_feeds(self, cursor) -> None:
|
||
"""
|
||
Seed initial feeds from seed_feeds.json file.
|
||
"""
|
||
import json
|
||
from pathlib import Path
|
||
|
||
try:
|
||
seed_file = Path(__file__).parent / "seed_feeds.json"
|
||
|
||
if not seed_file.exists():
|
||
logger.warning("⚠️ seed_feeds.json not found, skipping feed seeding")
|
||
return
|
||
|
||
with open(seed_file, 'r', encoding='utf-8') as f:
|
||
feeds_data = json.load(f)
|
||
|
||
for country, urls in feeds_data.items():
|
||
for url in urls:
|
||
cursor.execute(
|
||
"INSERT OR IGNORE INTO feeds (country, url) VALUES (?, ?)",
|
||
(country, url)
|
||
)
|
||
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Failed to seed feeds: {e}")
|
||
|
||
def _seed_settings(self, cursor) -> None:
|
||
"""
|
||
Seed initial settings from global constants.
|
||
"""
|
||
try:
|
||
settings_data = {
|
||
'ollama_host': OLLAMA_HOST,
|
||
'min_cron_hours': MIN_CRON_HOURS,
|
||
'cron_hours': CRON_HOURS,
|
||
'sync_cooldown_minutes': SYNC_COOLDOWN_MINUTES,
|
||
'llm_model': LLM_MODEL,
|
||
'llm_timeout_seconds': LLM_TIMEOUT_SECONDS,
|
||
'ollama_api_timeout_seconds': OLLAMA_API_TIMEOUT_SECONDS,
|
||
'article_fetch_timeout': ARTICLE_FETCH_TIMEOUT,
|
||
'max_article_length': MAX_ARTICLE_LENGTH
|
||
}
|
||
|
||
for key, val in settings_data.items():
|
||
cursor.execute(
|
||
"INSERT OR IGNORE INTO settings (key, val) VALUES (?, ?)",
|
||
(key, str(val))
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Failed to seed settings: {e}")
|
||
|
||
def _get_connection(self) -> sqlite3.Connection:
|
||
"""
|
||
Create a thread-safe database connection.
|
||
|
||
Returns:
|
||
An active SQLite connection
|
||
"""
|
||
conn = sqlite3.connect(
|
||
self.db_path,
|
||
check_same_thread=False,
|
||
timeout=30.0
|
||
)
|
||
conn.row_factory = sqlite3.Row
|
||
conn.execute("PRAGMA journal_mode=WAL")
|
||
conn.execute("PRAGMA busy_timeout=30000")
|
||
conn.execute("PRAGMA synchronous=NORMAL")
|
||
conn.execute("PRAGMA temp_store=MEMORY")
|
||
return conn
|
||
|
||
@contextmanager
|
||
def get_cursor(self, readonly: bool = False) -> Iterator[sqlite3.Cursor]:
|
||
"""
|
||
Context manager that provides a database cursor and handles commits and rollbacks.
|
||
|
||
Args:
|
||
readonly: If True, opens connection in readonly mode for better concurrency
|
||
|
||
Yields:
|
||
A database cursor for executing SQL statements
|
||
"""
|
||
conn = None
|
||
try:
|
||
conn = self._get_connection()
|
||
|
||
if readonly:
|
||
conn.execute("BEGIN DEFERRED")
|
||
|
||
cursor = conn.cursor()
|
||
yield cursor
|
||
|
||
if not readonly:
|
||
conn.commit()
|
||
except sqlite3.OperationalError as e:
|
||
if conn:
|
||
conn.rollback()
|
||
if "database is locked" in str(e).lower():
|
||
logger.warning(
|
||
f"⚠️ Database temporarily locked, operation may need retry: {e}"
|
||
)
|
||
raise e
|
||
except Exception as e:
|
||
if conn:
|
||
conn.rollback()
|
||
raise e
|
||
finally:
|
||
if conn:
|
||
conn.close()
|
||
|
||
@contextmanager
|
||
def get_cursor_with_retry(self, readonly: bool = False, max_retries: int = 3) -> Iterator[sqlite3.Cursor]:
|
||
"""
|
||
Context manager with retry logic for database operations.
|
||
|
||
Args:
|
||
readonly: If True, opens connection in readonly mode
|
||
max_retries: Maximum number of retry attempts
|
||
|
||
Yields:
|
||
A database cursor for executing SQL statements
|
||
"""
|
||
for attempt in range(max_retries + 1):
|
||
try:
|
||
with self.get_cursor(readonly=readonly) as cursor:
|
||
yield cursor
|
||
return
|
||
except sqlite3.OperationalError as e:
|
||
if "database is locked" in str(e).lower() and attempt < max_retries:
|
||
wait_time = (attempt + 1) * 0.1
|
||
logger.warning(
|
||
f"⚠️ Database locked, retrying in {wait_time}s "
|
||
f"(attempt {attempt + 1}/{max_retries + 1})"
|
||
)
|
||
import time
|
||
time.sleep(wait_time)
|
||
continue
|
||
raise e
|
||
|
||
|
||
db_manager = DatabaseManager(DB_PATH)
|
||
|
||
|
||
class ArticleSummary(TypedDict):
|
||
"""Type definition for article summary data returned from the LLM."""
|
||
title: str
|
||
summary_de: str
|
||
summary_en: str
|
||
|
||
|
||
class NewsFetcher:
|
||
"""
|
||
Handles fetching and summarizing news articles from RSS feeds.
|
||
Uses Ollama/qwen to generate summaries of articles.
|
||
"""
|
||
|
||
@staticmethod
|
||
async def fetch_article_content(client: httpx.AsyncClient, url: str) -> str:
|
||
"""
|
||
Fetch and extract the main content from an article URL.
|
||
|
||
Args:
|
||
client: An active httpx AsyncClient for making requests
|
||
url: URL of the article to fetch
|
||
|
||
Returns:
|
||
Extracted text content from the article, or empty string if failed
|
||
"""
|
||
try:
|
||
headers = {
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
|
||
'AppleWebKit/537.36 (KHTML, like Gecko) '
|
||
'Chrome/91.0.4472.124 Safari/537.36'
|
||
}
|
||
|
||
response = await client.get(
|
||
url,
|
||
headers=headers,
|
||
timeout=ARTICLE_FETCH_TIMEOUT,
|
||
follow_redirects=True
|
||
)
|
||
|
||
response.raise_for_status()
|
||
|
||
soup = BeautifulSoup(response.text, 'html.parser')
|
||
|
||
for element in soup(['script', 'style', 'nav', 'header', 'footer', 'aside', 'form', 'button']):
|
||
element.decompose()
|
||
|
||
content_selectors = [
|
||
'article',
|
||
'[role="main"]',
|
||
'.content',
|
||
'.article-content',
|
||
'.post-content',
|
||
'.entry-content',
|
||
'.main-content',
|
||
'main',
|
||
'.story-body',
|
||
'.article-body'
|
||
]
|
||
|
||
article_text = ""
|
||
|
||
for selector in content_selectors:
|
||
elements = soup.select(selector)
|
||
if elements:
|
||
for element in elements:
|
||
text = element.get_text(separator=' ', strip=True)
|
||
if len(text) > len(article_text):
|
||
article_text = text
|
||
break
|
||
|
||
# Fallback: get text from body if no specific content area found
|
||
if not article_text:
|
||
body = soup.find('body')
|
||
if body:
|
||
article_text = body.get_text(separator=' ', strip=True)
|
||
|
||
article_text = re.sub(r'\s+', ' ', article_text) # Normalize whitespace
|
||
article_text = article_text.strip()
|
||
|
||
# Limit length to avoid overwhelming the LLM
|
||
if len(article_text) > MAX_ARTICLE_LENGTH:
|
||
article_text = article_text[:MAX_ARTICLE_LENGTH] + "..."
|
||
|
||
return article_text
|
||
|
||
except httpx.TimeoutException:
|
||
logger.warning(f"⏰ Timeout fetching article content from: {url}")
|
||
return ""
|
||
except httpx.HTTPError as e:
|
||
logger.warning(f"🌐 HTTP error fetching article content from {url}: {e}")
|
||
return ""
|
||
except Exception as e:
|
||
logger.warning(f"❌ Error fetching article content from {url}: {type(e).__name__}: {e}")
|
||
return ""
|
||
|
||
@staticmethod
|
||
def build_prompt(url: str, title: str = "", description: str = "", content: str = "") -> str:
|
||
"""
|
||
Generate a prompt for the LLM to summarize an article.
|
||
|
||
Args:
|
||
url: Public URL of the article to summarize
|
||
title: Article title from RSS feed (optional)
|
||
description: Article description from RSS feed (optional)
|
||
content: Extracted article content (optional)
|
||
|
||
Returns:
|
||
A formatted prompt string that instructs the LLM to generate
|
||
a JSON response with title and summaries in German and English
|
||
"""
|
||
context_info = []
|
||
if title:
|
||
context_info.append(f"RSS-Titel: {title}")
|
||
if description:
|
||
context_info.append(f"RSS-Beschreibung: {description}")
|
||
if content:
|
||
content_preview = content[:500] + "..." if len(content) > 500 else content
|
||
context_info.append(f"Artikel-Inhalt: {content_preview}")
|
||
|
||
context = "\n".join(context_info) if context_info else "Keine zusätzlichen Informationen verfügbar."
|
||
|
||
return (
|
||
"### Aufgabe\n"
|
||
f"Du sollst eine Nachricht basierend auf der URL und den verfügbaren Informationen zusammenfassen.\n"
|
||
f"URL: {url}\n"
|
||
f"Verfügbare Informationen:\n{context}\n\n"
|
||
"### Regeln\n"
|
||
"1. Nutze VORRANGIG den Artikel-Inhalt falls verfügbar, ergänze mit RSS-Informationen\n"
|
||
"2. Falls kein Artikel-Inhalt verfügbar ist, nutze RSS-Titel und -Beschreibung\n"
|
||
"3. Falls keine ausreichenden Informationen vorliegen, erstelle eine plausible Zusammenfassung basierend auf der URL\n"
|
||
"4. Gib ausschließlich **gültiges minifiziertes JSON** zurück – kein Markdown, keine Kommentare\n"
|
||
"5. Struktur: {\"title\":\"…\",\"description\":\"…\"}\n"
|
||
"6. title: Aussagekräftiger deutscher Titel (max 100 Zeichen)\n"
|
||
"7. description: Deutsche Zusammenfassung (zwischen 100 und 160 Wörter)\n"
|
||
"8. Kein Text vor oder nach dem JSON\n\n"
|
||
"### Ausgabe\n"
|
||
"Jetzt antworte mit dem JSON:"
|
||
)
|
||
|
||
@staticmethod
|
||
async def summarize_article(
|
||
client: httpx.AsyncClient,
|
||
url: str,
|
||
title: str = "",
|
||
description: str = ""
|
||
) -> Optional[ArticleSummary]:
|
||
"""
|
||
Generate a summary of an article using the LLM.
|
||
Now fetches the actual article content for more accurate summaries.
|
||
|
||
Args:
|
||
client: An active httpx AsyncClient for making requests
|
||
url: URL of the article to summarize
|
||
title: Article title from RSS feed
|
||
description: Article description from RSS feed
|
||
|
||
Returns:
|
||
A dictionary containing the article title and summaries in German and English,
|
||
or None if summarization failed
|
||
"""
|
||
article_content = await NewsFetcher.fetch_article_content(client, url)
|
||
|
||
if not article_content:
|
||
logger.warning(f"⚠️ Could not fetch article content, using RSS data only")
|
||
|
||
prompt = NewsFetcher.build_prompt(url, title, description, article_content)
|
||
payload = {
|
||
"model": LLM_MODEL,
|
||
"prompt": prompt,
|
||
"stream": False,
|
||
"temperature": 0.1,
|
||
"format": "json"
|
||
}
|
||
|
||
try:
|
||
response = await client.post(
|
||
f"{OLLAMA_HOST}/api/generate",
|
||
json=payload,
|
||
timeout=LLM_TIMEOUT_SECONDS
|
||
)
|
||
|
||
response.raise_for_status()
|
||
result = response.json()
|
||
llm_response = result["response"]
|
||
|
||
if isinstance(llm_response, str):
|
||
summary_data = json.loads(llm_response)
|
||
else:
|
||
summary_data = llm_response
|
||
|
||
# Validate required fields
|
||
required_fields = ["title", "description"]
|
||
missing_fields = [field for field in required_fields if field not in summary_data]
|
||
|
||
if missing_fields:
|
||
logger.warning(
|
||
f"⚠️ Missing required fields in summary: {missing_fields}"
|
||
)
|
||
return None
|
||
|
||
# Check summary quality metrics
|
||
description = len(summary_data.get("description", "").split())
|
||
|
||
if description > 160 or description < 100:
|
||
logger.warning(
|
||
f"⚠️ Summary exceeds word limit - "
|
||
f"Description: {description}/160"
|
||
)
|
||
|
||
return cast(ArticleSummary, summary_data)
|
||
|
||
except json.JSONDecodeError as e:
|
||
logger.error(f"❌ JSON parsing error for {url}: {e}")
|
||
logger.error(
|
||
f"🔍 Raw response that failed to parse: {llm_response[:500]}..."
|
||
)
|
||
return None
|
||
except httpx.HTTPError as e:
|
||
logger.error(f"❌ HTTP error for {url}: {e}")
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"❌ Unexpected error summarizing {url}: {type(e).__name__}: {e}")
|
||
return None
|
||
|
||
@staticmethod
|
||
async def harvest_feeds() -> None:
|
||
"""
|
||
Fetch articles from all feeds and store summaries in the database.
|
||
This is the main function that runs periodically to update the news database.
|
||
"""
|
||
|
||
total_articles = 0
|
||
successful_articles = 0
|
||
failed_articles = 0
|
||
|
||
try:
|
||
with db_manager.get_cursor() as cursor:
|
||
cursor.execute("SELECT country, url FROM feeds")
|
||
feeds = cursor.fetchall()
|
||
|
||
async with httpx.AsyncClient() as client:
|
||
for i, feed_row in enumerate(feeds, 1):
|
||
feed_stats = await NewsFetcher._process_feed(client, feed_row)
|
||
|
||
total_articles += feed_stats['total']
|
||
successful_articles += feed_stats['successful']
|
||
failed_articles += feed_stats['failed']
|
||
|
||
current_time = int(datetime.now(timezone.utc).timestamp())
|
||
with db_manager.get_cursor() as cursor:
|
||
cursor.execute(
|
||
"UPDATE meta SET val=? WHERE key='last_sync'",
|
||
(str(current_time),)
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Critical error during harvest: {type(e).__name__}: {e}")
|
||
raise
|
||
|
||
@staticmethod
|
||
async def _process_feed(
|
||
client: httpx.AsyncClient,
|
||
feed_row: sqlite3.Row
|
||
) -> Dict[str, int]:
|
||
"""
|
||
Process a single feed, fetching and summarizing all articles.
|
||
Now saves summaries immediately to the database with better concurrency.
|
||
|
||
Args:
|
||
client: An active httpx AsyncClient for making requests
|
||
feed_row: A database row containing feed information
|
||
|
||
Returns:
|
||
Dictionary with processing statistics
|
||
"""
|
||
stats = {'total': 0, 'successful': 0, 'failed': 0, 'skipped': 0}
|
||
|
||
try:
|
||
feed_data = feedparser.parse(feed_row["url"])
|
||
|
||
if hasattr(feed_data, 'bozo') and feed_data.bozo:
|
||
logger.warning(f"⚠️ Feed has parsing issues: {feed_row['url']}")
|
||
if hasattr(feed_data, 'bozo_exception'):
|
||
logger.warning(f"⚠️ Feed exception: {feed_data.bozo_exception}")
|
||
|
||
total_entries = len(feed_data.entries)
|
||
|
||
if total_entries == 0:
|
||
logger.warning(f"⚠️ No entries found in feed: {feed_row['url']}")
|
||
return stats
|
||
|
||
for i, entry in enumerate(feed_data.entries, 1):
|
||
stats['total'] += 1
|
||
|
||
if not hasattr(entry, "link"):
|
||
stats['skipped'] += 1
|
||
continue
|
||
|
||
if not hasattr(entry, "published_parsed"):
|
||
stats['skipped'] += 1
|
||
continue
|
||
|
||
article_url = entry.link
|
||
|
||
try:
|
||
published = datetime(
|
||
*entry.published_parsed[:6],
|
||
tzinfo=timezone.utc
|
||
)
|
||
except (TypeError, ValueError):
|
||
stats['skipped'] += 1
|
||
continue
|
||
|
||
# Check if article already exists - use readonly connection for better concurrency
|
||
try:
|
||
with db_manager.get_cursor_with_retry(readonly=True) as cursor:
|
||
cursor.execute("SELECT id FROM news WHERE url = ?", (article_url,))
|
||
if cursor.fetchone():
|
||
stats['skipped'] += 1
|
||
continue
|
||
except Exception as db_error:
|
||
logger.warning(f"⚠️ Database check failed for article {i}, continuing: {db_error}")
|
||
|
||
rss_title = getattr(entry, 'title', '')
|
||
rss_description = getattr(entry, 'description', '') or getattr(entry, 'summary', '')
|
||
|
||
summary = await NewsFetcher.summarize_article(
|
||
client,
|
||
article_url,
|
||
title=rss_title,
|
||
description=rss_description
|
||
)
|
||
|
||
if not summary:
|
||
logger.warning(f"❌ Failed to get summary for article {i}: {article_url}")
|
||
stats['failed'] += 1
|
||
continue
|
||
|
||
published_timestamp = int(published.timestamp())
|
||
|
||
try:
|
||
with db_manager.get_cursor_with_retry(readonly=False) as cursor:
|
||
cursor.execute(
|
||
"""
|
||
INSERT OR IGNORE INTO news
|
||
(title, description, url, published, country)
|
||
VALUES (?, ?, ?, ?, ?)
|
||
""",
|
||
(
|
||
summary["title"],
|
||
summary["description"],
|
||
article_url,
|
||
published_timestamp,
|
||
feed_row["country"],
|
||
)
|
||
)
|
||
|
||
stats['successful'] += 1
|
||
|
||
except Exception as db_error:
|
||
logger.error(f"❌ Database error for article {i}: {db_error}")
|
||
stats['failed'] += 1
|
||
continue
|
||
|
||
await asyncio.sleep(0.01) # 10ms delay to yield control
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Error processing feed {feed_row['url']}: {type(e).__name__}: {e}")
|
||
|
||
return stats
|
||
|
||
|
||
scheduler = AsyncIOScheduler(timezone="UTC")
|
||
scheduler.add_job(
|
||
NewsFetcher.harvest_feeds,
|
||
"interval",
|
||
hours=CRON_HOURS,
|
||
id="harvest"
|
||
)
|
||
scheduler.start()
|
||
|
||
|
||
# Pydantic models for API requests and responses
|
||
class CronSettings(BaseModel):
|
||
"""Settings for the cron job that harvests news."""
|
||
hours: float
|
||
|
||
|
||
class FeedData(BaseModel):
|
||
"""Data for a news feed."""
|
||
country: str
|
||
url: str
|
||
|
||
|
||
class ModelStatus(BaseModel):
|
||
"""Status of the LLM model."""
|
||
name: str
|
||
status: str
|
||
available_models: List[str]
|
||
|
||
|
||
class ErrorResponse(BaseModel):
|
||
"""Standard error response."""
|
||
status: str
|
||
message: str
|
||
|
||
|
||
class SuccessResponse(BaseModel):
|
||
"""Standard success response."""
|
||
status: str
|
||
|
||
|
||
class TimestampResponse(BaseModel):
|
||
"""Response containing a timestamp."""
|
||
ts: int
|
||
|
||
|
||
class HoursResponse(BaseModel):
|
||
"""Response containing hours setting."""
|
||
hours: float
|
||
|
||
|
||
async def get_db():
|
||
"""
|
||
Dependency that provides a database cursor with retry logic.
|
||
|
||
Yields:
|
||
A database cursor for executing SQL statements
|
||
"""
|
||
with db_manager.get_cursor_with_retry(readonly=True) as cursor:
|
||
yield cursor
|
||
|
||
async def get_db_write():
|
||
"""
|
||
Dependency that provides a database cursor for write operations with retry logic.
|
||
|
||
Yields:
|
||
A database cursor for executing SQL statements
|
||
"""
|
||
with db_manager.get_cursor_with_retry(readonly=False) as cursor:
|
||
yield cursor
|
||
|
||
|
||
# API endpoints
|
||
@app.get("/news", response_model=List[Dict[str, Any]])
|
||
async def get_news(
|
||
country: str = "DE",
|
||
from_: str = "2025-07-01",
|
||
to_: str = datetime.now(timezone.utc).strftime("%Y-%m-%d"),
|
||
db: sqlite3.Cursor = Depends(get_db)
|
||
):
|
||
"""
|
||
Get news articles filtered by country and date range.
|
||
Now optimized for concurrent access while scheduler is running.
|
||
|
||
Args:
|
||
country: Country code to filter by (default: "DE")
|
||
from_: Start date in ISO format (default: "2025-07-01")
|
||
to: End date in ISO format (default: current date)
|
||
db: Database cursor dependency
|
||
|
||
Returns:
|
||
List of news articles matching the criteria
|
||
"""
|
||
try:
|
||
datetime.fromisoformat(from_)
|
||
datetime.fromisoformat(to_)
|
||
|
||
from_ts = int(datetime.fromisoformat(from_).timestamp())
|
||
to_ts = int(datetime.fromisoformat(to_).timestamp())
|
||
|
||
db.execute(
|
||
"""
|
||
SELECT id, title, description, url, published, country, created_at
|
||
FROM news
|
||
WHERE country = ? AND published BETWEEN ? AND ?
|
||
ORDER BY published DESC
|
||
LIMIT 1000
|
||
""",
|
||
(country, from_ts, to_ts)
|
||
)
|
||
|
||
return [dict(row) for row in db.fetchall()]
|
||
|
||
except ValueError:
|
||
raise HTTPException(400, "Invalid date format. Use ISO format (YYYY-MM-DD)")
|
||
except Exception as e:
|
||
logger.error(f"❌ Error fetching news: {e}")
|
||
raise HTTPException(
|
||
500, "Internal server error while fetching news"
|
||
)
|
||
|
||
|
||
@app.get("/feeds", response_model=List[Dict[str, Any]])
|
||
async def list_feeds(db: sqlite3.Cursor = Depends(get_db)):
|
||
"""
|
||
List all registered news feeds.
|
||
|
||
Args:
|
||
db: Database cursor dependency
|
||
|
||
Returns:
|
||
List of feed objects with id, country, and url
|
||
"""
|
||
try:
|
||
db.execute("SELECT * FROM feeds ORDER BY country, url")
|
||
return [dict(row) for row in db.fetchall()]
|
||
except Exception as e:
|
||
logger.error(f"❌ Error fetching feeds: {e}")
|
||
raise HTTPException(
|
||
500, "Internal server error while fetching feeds"
|
||
)
|
||
|
||
|
||
@app.get("/meta/last-sync", response_model=TimestampResponse)
|
||
async def get_last_sync(db: sqlite3.Cursor = Depends(get_db)):
|
||
"""
|
||
Get the timestamp of the last successful feed synchronization.
|
||
|
||
Args:
|
||
db: Database cursor dependency
|
||
|
||
Returns:
|
||
Object containing the timestamp as a Unix epoch
|
||
"""
|
||
db.execute("SELECT val FROM meta WHERE key='last_sync'")
|
||
row = db.fetchone()
|
||
if row is None:
|
||
import time
|
||
return {"ts": int(time.time())}
|
||
return {"ts": int(row["val"])}
|
||
|
||
|
||
@app.post("/feeds", response_model=SuccessResponse)
|
||
async def add_feed(
|
||
feed: FeedData,
|
||
db: sqlite3.Cursor = Depends(get_db_write)
|
||
):
|
||
"""
|
||
Add a new news feed.
|
||
|
||
Args:
|
||
feed: Feed data with country and URL
|
||
db: Database cursor dependency
|
||
|
||
Returns:
|
||
Success status
|
||
"""
|
||
try:
|
||
db.execute(
|
||
"INSERT INTO feeds (country, url) VALUES (?, ?) "
|
||
"ON CONFLICT (url) DO NOTHING",
|
||
(feed.country, feed.url)
|
||
)
|
||
return {"status": "added"}
|
||
except Exception as e:
|
||
logger.error(f"❌ Error adding feed: {e}")
|
||
raise HTTPException(
|
||
500, "Internal server error while adding feed"
|
||
)
|
||
|
||
|
||
@app.delete("/feeds", response_model=SuccessResponse)
|
||
async def delete_feed(
|
||
url: str,
|
||
db: sqlite3.Cursor = Depends(get_db_write)
|
||
):
|
||
"""
|
||
Delete a news feed by URL.
|
||
|
||
Args:
|
||
url: URL of the feed to delete
|
||
db: Database cursor dependency
|
||
|
||
Returns:
|
||
Success status
|
||
"""
|
||
try:
|
||
db.execute("DELETE FROM feeds WHERE url=?", (url,))
|
||
return {"status": "deleted"}
|
||
except Exception as e:
|
||
logger.error(f"❌ Error deleting feed: {e}")
|
||
raise HTTPException(
|
||
500, "Internal server error while deleting feed"
|
||
)
|
||
|
||
|
||
@app.get("/model/status", response_model=Union[ModelStatus, ErrorResponse])
|
||
async def get_model_status():
|
||
"""
|
||
Check the status of the LLM model.
|
||
|
||
Returns:
|
||
Object containing model name, status, and available models,
|
||
or an error response if the model service is unavailable
|
||
"""
|
||
try:
|
||
async with httpx.AsyncClient() as client:
|
||
response = await client.get(
|
||
f"{OLLAMA_HOST}/api/tags",
|
||
timeout=OLLAMA_API_TIMEOUT_SECONDS
|
||
)
|
||
response.raise_for_status()
|
||
|
||
models_data = response.json()
|
||
models = models_data.get("models", [])
|
||
|
||
model_available = any(
|
||
model.get("name") == LLM_MODEL for model in models
|
||
)
|
||
|
||
return {
|
||
"name": LLM_MODEL,
|
||
"status": "ready" if model_available else "not available",
|
||
"available_models": [model.get("name") for model in models]
|
||
}
|
||
except Exception as e:
|
||
return {"status": "error", "message": str(e)}
|
||
|
||
|
||
@app.post("/sync", response_model=None)
|
||
async def manual_sync(db: sqlite3.Cursor = Depends(get_db)):
|
||
"""
|
||
Manually trigger a feed synchronization.
|
||
|
||
Args:
|
||
db: Database cursor dependency
|
||
|
||
Returns:
|
||
Success status or error response if sync was triggered too recently
|
||
"""
|
||
db.execute("SELECT val FROM meta WHERE key='last_sync'")
|
||
row = db.fetchone()
|
||
last_sync_ts = int(row["val"])
|
||
|
||
now = datetime.now(timezone.utc)
|
||
last_sync_time = datetime.fromtimestamp(last_sync_ts, timezone.utc)
|
||
|
||
if now - last_sync_time < timedelta(minutes=SYNC_COOLDOWN_MINUTES):
|
||
return Response(
|
||
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
|
||
content="Sync was triggered too recently. Please wait before triggering again."
|
||
)
|
||
|
||
try:
|
||
task = asyncio.create_task(NewsFetcher.harvest_feeds())
|
||
return {"status": "triggered", "task_id": id(task)}
|
||
except Exception as e:
|
||
raise HTTPException(
|
||
500, f"Failed to trigger sync: {str(e)}"
|
||
)
|
||
|
||
|
||
@app.get("/settings/cron", response_model=HoursResponse)
|
||
async def get_cron_schedule(db: sqlite3.Cursor = Depends(get_db)):
|
||
"""
|
||
Get the current cron schedule for harvesting news.
|
||
|
||
Args:
|
||
db: Database cursor dependency
|
||
|
||
Returns:
|
||
Object containing the current hours setting
|
||
"""
|
||
db.execute("SELECT val FROM settings WHERE key='cron_hours'")
|
||
row = db.fetchone()
|
||
|
||
if row is None:
|
||
return {"hours": CRON_HOURS}
|
||
|
||
try:
|
||
hours = float(row["val"])
|
||
return {"hours": hours}
|
||
except (ValueError, TypeError):
|
||
return {"hours": CRON_HOURS}
|
||
|
||
|
||
@app.post("/settings/cron", response_model=HoursResponse)
|
||
async def update_cron_schedule(data: CronSettings, db: sqlite3.Cursor = Depends(get_db_write)):
|
||
"""
|
||
Update the cron schedule for harvesting news.
|
||
|
||
Args:
|
||
data: New cron settings with hours interval
|
||
db: Database cursor dependency
|
||
|
||
Returns:
|
||
Object containing the updated hours setting
|
||
"""
|
||
hours = max(MIN_CRON_HOURS, data.hours)
|
||
|
||
scheduler.get_job("harvest").modify(trigger=IntervalTrigger(hours=hours))
|
||
|
||
if os.getenv("CRON_HOURS") is None:
|
||
db.execute(
|
||
"UPDATE settings SET val=? WHERE key='cron_hours'",
|
||
(str(hours),)
|
||
)
|
||
|
||
global CRON_HOURS
|
||
CRON_HOURS = hours
|
||
|
||
return {"hours": hours}
|
||
|
||
|
||
# Mount static frontend
|
||
frontend_path = os.path.join(
|
||
os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
|
||
"frontend",
|
||
"dist"
|
||
)
|
||
app.mount("/", StaticFiles(directory=frontend_path, html=True), name="static")
|