""" Owly News Summariser Backend This module provides a FastAPI application that serves as the backend for the Owly News Summariser. It handles fetching news from RSS feeds, summarizing articles using Ollama/qwen, and providing an API for the frontend to access the summarized news. The application uses SQLite for data storage and APScheduler for scheduling periodic news harvesting. """ import asyncio import json import os import sqlite3 from contextlib import contextmanager from datetime import datetime, timezone, timedelta from http.client import HTTPException from pathlib import Path from typing import Dict, List, Optional, Any, Union, Iterator, Tuple, TypedDict, cast import logging import feedparser import httpx from apscheduler.schedulers.asyncio import AsyncIOScheduler from fastapi import FastAPI, Response, status, Depends from fastapi.staticfiles import StaticFiles from pydantic import BaseModel # Constants DB_PATH = Path("owlynews.sqlite") OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://localhost:11434") MIN_CRON_HOURS = 0.1 DEFAULT_CRON_HOURS = float(os.getenv("CRON_HOURS", MIN_CRON_HOURS)) CRON_HOURS = max(MIN_CRON_HOURS, DEFAULT_CRON_HOURS) SYNC_COOLDOWN_MINUTES = 30 LLM_MODEL = "qwen2:7b-instruct-q4_K_M" LLM_TIMEOUT_SECONDS = 180 OLLAMA_API_TIMEOUT_SECONDS = 10 # Add logging configuration at the top of your file logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # FastAPI app initialization app = FastAPI( title="Owly News Summariser", description="API for the Owly News Summariser application", version="1.0.0" ) # Database schema definitions SCHEMA_SQL = [ """ CREATE TABLE IF NOT EXISTS news ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL, description TEXT, url TEXT NOT NULL, published TEXT NOT NULL, country TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """, "CREATE INDEX IF NOT EXISTS idx_news_published ON news(published)", """ CREATE TABLE IF NOT EXISTS feeds ( id INTEGER PRIMARY KEY, country TEXT, url TEXT UNIQUE NOT NULL ) """, """ CREATE TABLE IF NOT EXISTS settings ( key TEXT PRIMARY KEY, val TEXT NOT NULL ) """, """ CREATE TABLE IF NOT EXISTS meta ( key TEXT PRIMARY KEY, val TEXT NOT NULL ) """ ] class DatabaseManager: """ Manages database connections and operations for the application. Provides methods for initializing the database, executing queries, and managing transactions. """ def __init__(self, db_path: Path): """ Initialize the database manager with the given database path. Args: db_path: Path to the SQLite database file """ self.db_path = db_path self._initialize_db() def _get_connection(self) -> sqlite3.Connection: """ Create a thread-safe database connection. Returns: An active SQLite connection """ conn = sqlite3.connect( self.db_path, check_same_thread=False, # Allow use across threads timeout=20.0 # Add timeout to prevent deadlocks ) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") return conn @contextmanager def get_cursor(self) -> Iterator[sqlite3.Cursor]: """ Context manager that provides a database cursor and handles commits and rollbacks. Yields: A database cursor for executing SQL statements """ conn = None try: conn = self._get_connection() cursor = conn.cursor() yield cursor conn.commit() except Exception as e: if conn: conn.rollback() raise e finally: if conn: conn.close() def _initialize_db(self) -> None: """ Initialize the database schema and default settings. Creates tables if they don't exist and inserts default values. """ logger.info("🗄️ Initializing database...") # Create schema with self.get_cursor() as cursor: for i, stmt in enumerate(SCHEMA_SQL): logger.debug(f"📝 Executing schema statement {i+1}/{len(SCHEMA_SQL)}") cursor.execute(stmt) # Add migration for description column if it doesn't exist try: cursor.execute("SELECT description FROM news LIMIT 1") logger.debug("✅ Description column exists") except sqlite3.OperationalError: # Column doesn't exist, add it logger.info("🔧 Adding missing description column to news table...") cursor.execute("ALTER TABLE news ADD COLUMN description TEXT") # Insert initial settings cursor.execute( "INSERT INTO settings VALUES (?, ?) ON CONFLICT (key) DO NOTHING", ("cron_hours", str(CRON_HOURS)) ) logger.debug("⚙️ Settings initialized") # Insert initial metadata cursor.execute( "INSERT INTO meta VALUES (?, ?) ON CONFLICT (key) DO NOTHING", ("last_sync", "0") ) logger.debug("📊 Metadata initialized") # Check current feed count cursor.execute("SELECT COUNT(*) as count FROM feeds") feed_count = cursor.fetchone()["count"] logger.info(f"📡 Current feeds in database: {feed_count}") # Seed feeds if none exist if feed_count == 0: logger.info("🌱 No feeds found, starting seeding process...") feeds_added = self._seed_feeds(cursor) # Pass the existing cursor # Verify seeding worked cursor.execute("SELECT COUNT(*) as count FROM feeds") new_feed_count = cursor.fetchone()["count"] logger.info(f"📡 Feeds after seeding: {new_feed_count}") else: logger.info("📡 Feeds already exist, skipping seeding") logger.info("✅ Database initialization complete") def _seed_feeds(self, cursor: sqlite3.Cursor) -> int: """ Seed the database with initial feeds from the seed_feeds.json file. Only runs if the feeds table is empty. Args: cursor: Database cursor to use for operations Returns: Number of feeds added """ logger.info("🌱 Seeding feeds from seed_feeds.json...") feeds_added = 0 try: seed_path = Path(__file__).with_name("seed_feeds.json") logger.debug(f"📁 Looking for seed file at: {seed_path}") if not seed_path.exists(): logger.error(f"❌ Seed file not found at: {seed_path}") return feeds_added with open(seed_path, "r") as f: seed_data = json.load(f) logger.debug(f"📄 Loaded seed data: {seed_data}") for country, urls in seed_data.items(): logger.info(f"🌍 Processing {len(urls)} feeds for country: {country}") for url in urls: try: cursor.execute( "INSERT INTO feeds (country, url) VALUES (?, ?) " "ON CONFLICT (url) DO NOTHING", (country, url) ) # Check if the insert actually added a row if cursor.rowcount > 0: feeds_added += 1 logger.debug(f"✅ Added feed: {url} ({country})") else: logger.debug(f"⏩ Feed already exists: {url} ({country})") except Exception as e: logger.error(f"❌ Failed to add feed {url}: {e}") logger.info(f"🌱 Seeding complete: {feeds_added} feeds added") except json.JSONDecodeError as e: logger.error(f"❌ Invalid JSON in seed_feeds.json: {e}") # Re-read file content for error reporting try: with open(seed_path, "r") as f: content = f.read() logger.error(f"📄 File content causing error: {content}") except: logger.error("📄 Could not re-read file for error reporting") except FileNotFoundError as e: logger.error(f"❌ Seed file not found: {e}") except Exception as e: logger.error(f"❌ Error seeding feeds: {e}") return feeds_added # Initialize database manager db_manager = DatabaseManager(DB_PATH) class ArticleSummary(TypedDict): """Type definition for article summary data returned from the LLM.""" title: str summary_de: str summary_en: str class NewsFetcher: """ Handles fetching and summarizing news articles from RSS feeds. Uses Ollama/qwen to generate summaries of articles. """ @staticmethod def build_prompt(url: str, title: str = "", description: str = "") -> str: """ Generate a prompt for the LLM to summarize an article. Args: url: Public URL of the article to summarize title: Article title from RSS feed (optional) description: Article description from RSS feed (optional) Returns: A formatted prompt string that instructs the LLM to generate a JSON response with title and summaries in German and English """ context_info = [] if title: context_info.append(f"Titel: {title}") if description: context_info.append(f"Beschreibung: {description}") context = "\n".join(context_info) if context_info else "Keine zusätzlichen Informationen verfügbar." return ( "### Aufgabe\n" f"Du sollst eine Nachricht basierend auf der URL und den verfügbaren Informationen zusammenfassen.\n" f"URL: {url}\n" f"Verfügbare Informationen:\n{context}\n\n" "### Regeln\n" "1. Nutze die verfügbaren Informationen (Titel, Beschreibung) und dein Wissen über die URL-Domain\n" "2. Falls keine ausreichenden Informationen vorliegen, erstelle eine plausible Zusammenfassung basierend auf der URL\n" "3. Gib ausschließlich **gültiges minifiziertes JSON** zurück – kein Markdown, keine Kommentare\n" "4. Struktur: {\"title\":\"…\",\"summary_de\":\"…\",\"summary_en\":\"…\"}\n" "5. title: Aussagekräftiger deutscher Titel (max 100 Zeichen)\n" "6. summary_de: Deutsche Zusammenfassung (max 160 Wörter)\n" "7. summary_en: Englische Zusammenfassung (max 160 Wörter)\n" "8. Kein Text vor oder nach dem JSON\n\n" "### Ausgabe\n" "Jetzt antworte mit dem JSON:" ) @staticmethod async def summarize_article( client: httpx.AsyncClient, url: str, title: str = "", description: str = "" ) -> Optional[ArticleSummary]: """ Generate a summary of an article using the LLM. Args: client: An active httpx AsyncClient for making requests url: URL of the article to summarize title: Article title from RSS feed description: Article description from RSS feed Returns: A dictionary containing the article title and summaries in German and English, or None if summarization failed """ logger.info(f"🤖 Starting article summarization for: {url}") logger.debug(f"📝 RSS Title: {title[:50]}..." if title else "📝 No RSS title") logger.debug(f"📄 RSS Description: {description[:100]}..." if description else "📄 No RSS description") prompt = NewsFetcher.build_prompt(url, title, description) payload = { "model": LLM_MODEL, "prompt": prompt, "stream": False, "temperature": 0.3, # Slightly increase creativity "format": "json" } try: logger.debug(f"📤 Sending request to Ollama API with model: {LLM_MODEL}") start_time = datetime.now() response = await client.post( f"{OLLAMA_HOST}/api/generate", json=payload, timeout=LLM_TIMEOUT_SECONDS ) elapsed_time = (datetime.now() - start_time).total_seconds() logger.info(f"⏱️ Ollama API response received in {elapsed_time:.2f}s") response.raise_for_status() result = response.json() logger.debug(f"📥 Raw Ollama response keys: {list(result.keys())}") # Parse the JSON string returned by the LLM llm_response = result["response"] logger.debug(f"🔍 LLM response type: {type(llm_response)}") logger.debug(f"🔍 LLM response preview: {str(llm_response)[:200]}...") if isinstance(llm_response, str): logger.debug("📋 Parsing JSON string response") summary_data = json.loads(llm_response) else: logger.debug("📋 Using direct dict response") summary_data = llm_response # Validate required fields required_fields = ["title", "summary_de", "summary_en"] missing_fields = [field for field in required_fields if field not in summary_data] if missing_fields: logger.warning(f"⚠️ Missing required fields in summary: {missing_fields}") return None # Log summary quality metrics title_len = len(summary_data.get("title", "")) de_words = len(summary_data.get("summary_de", "").split()) en_words = len(summary_data.get("summary_en", "").split()) logger.info(f"✅ Summary generated - Title: {title_len} chars, DE: {de_words} words, EN: {en_words} words") if de_words > 160 or en_words > 160: logger.warning(f"⚠️ Summary exceeds word limit - DE: {de_words}/160, EN: {en_words}/160") return cast(ArticleSummary, summary_data) except json.JSONDecodeError as e: logger.error(f"❌ JSON parsing error for {url}: {e}") logger.error(f"🔍 Raw response that failed to parse: {llm_response[:500]}...") return None except httpx.HTTPError as e: logger.error(f"❌ HTTP error for {url}: {e}") return None except Exception as e: logger.error(f"❌ Unexpected error summarizing {url}: {type(e).__name__}: {e}") return None @staticmethod async def harvest_feeds() -> None: """ Fetch articles from all feeds and store summaries in the database. This is the main function that runs periodically to update the news database. """ logger.info("🚀 Starting scheduled news harvest...") harvest_start_time = datetime.now() total_feeds = 0 total_articles = 0 successful_articles = 0 failed_articles = 0 try: # Get all feeds from the database with db_manager.get_cursor() as cursor: cursor.execute("SELECT country, url FROM feeds") feeds = cursor.fetchall() total_feeds = len(feeds) logger.info(f"📡 Found {total_feeds} feeds to process") # Process each feed async with httpx.AsyncClient() as client: for i, feed_row in enumerate(feeds, 1): logger.info(f"📰 Processing feed {i}/{total_feeds}: {feed_row['url']} ({feed_row['country']})") feed_stats = await NewsFetcher._process_feed(client, feed_row) total_articles += feed_stats['total'] successful_articles += feed_stats['successful'] failed_articles += feed_stats['failed'] logger.info(f"📊 Feed {i} complete: {feed_stats['successful']}/{feed_stats['total']} articles processed successfully") # Update last sync timestamp current_time = int(datetime.now(timezone.utc).timestamp()) with db_manager.get_cursor() as cursor: cursor.execute( "UPDATE meta SET val=? WHERE key='last_sync'", (str(current_time),) ) harvest_duration = (datetime.now() - harvest_start_time).total_seconds() logger.info(f"✅ News harvest completed in {harvest_duration:.2f}s") logger.info(f"📊 Final stats: {total_feeds} feeds, {successful_articles}/{total_articles} articles processed successfully") except Exception as e: logger.error(f"❌ Critical error during harvest: {type(e).__name__}: {e}") raise @staticmethod async def _process_feed( client: httpx.AsyncClient, feed_row: sqlite3.Row ) -> Dict[str, int]: """ Process a single feed, fetching and summarizing all articles. Args: client: An active httpx AsyncClient for making requests feed_row: A database row containing feed information Returns: Dictionary with processing statistics """ stats = {'total': 0, 'successful': 0, 'failed': 0, 'skipped': 0} try: logger.debug(f"🔍 Parsing RSS feed: {feed_row['url']}") feed_data = feedparser.parse(feed_row["url"]) if hasattr(feed_data, 'bozo') and feed_data.bozo: logger.warning(f"⚠️ Feed has parsing issues: {feed_row['url']}") if hasattr(feed_data, 'bozo_exception'): logger.warning(f"⚠️ Feed exception: {feed_data.bozo_exception}") total_entries = len(feed_data.entries) logger.info(f"📄 Found {total_entries} entries in feed") if total_entries == 0: logger.warning(f"⚠️ No entries found in feed: {feed_row['url']}") return stats for i, entry in enumerate(feed_data.entries, 1): stats['total'] += 1 logger.debug(f"📝 Processing article {i}/{total_entries}") # Skip entries without links or published dates if not hasattr(entry, "link"): logger.debug(f"⏩ Skipping entry {i}: no link") stats['skipped'] += 1 continue if not hasattr(entry, "published_parsed"): logger.debug(f"⏩ Skipping entry {i}: no published date") # TODO: change back to 0.5 stats['skipped'] += 1 continue article_url = entry.link logger.debug(f"🔗 Processing article: {article_url}") # Parse the published date try: published = datetime( *entry.published_parsed[:6], tzinfo=timezone.utc ) logger.debug(f"📅 Article published: {published}") except (TypeError, ValueError) as e: logger.debug(f"⏩ Skipping entry {i}: invalid date - {e}") stats['skipped'] += 1 continue # Check if article already exists with db_manager.get_cursor() as cursor: cursor.execute("SELECT id FROM news WHERE url = ?", (article_url,)) if cursor.fetchone(): logger.debug(f"⏩ Skipping entry {i}: article already exists") stats['skipped'] += 1 continue # Get article summary logger.debug(f"🤖 Requesting summary for article {i}") # Extract title and description from RSS entry rss_title = getattr(entry, 'title', '') rss_description = getattr(entry, 'description', '') or getattr(entry, 'summary', '') summary = await NewsFetcher.summarize_article( client, article_url, title=rss_title, description=rss_description ) if not summary: logger.warning(f"❌ Failed to get summary for article {i}: {article_url}") stats['failed'] += 1 continue published_timestamp = int(published.timestamp()) # Handle source field - it can be a string or dict source_value = entry.get("source", feed_row["url"]) if isinstance(source_value, dict): source_title = source_value.get("title", feed_row["url"]) else: source_title = source_value if source_value else feed_row["url"] logger.debug(f"💾 Storing article in database") # Store in database try: with db_manager.get_cursor() as cursor: cursor.execute( """ INSERT INTO news (title, description, url, published, country) VALUES (?, ?, ?, ?, ?) """, ( summary["title"], summary["summary_de"], article_url, published_timestamp, feed_row["country"], ) ) logger.info(f"✅ Successfully processed article {i}: {summary['title'][:50]}...") stats['successful'] += 1 except Exception as db_error: logger.error(f"❌ Database error for article {i}: {db_error}") stats['failed'] += 1 continue except Exception as e: logger.error(f"❌ Error processing feed {feed_row['url']}: {type(e).__name__}: {e}") logger.info(f"📊 Feed processing complete: {stats['successful']} successful, {stats['failed']} failed, {stats['skipped']} skipped out of {stats['total']} total") return stats # Initialize scheduler scheduler = AsyncIOScheduler(timezone="UTC") scheduler.add_job( NewsFetcher.harvest_feeds, "interval", hours=CRON_HOURS, id="harvest" ) print(f"Starting scheduler with {CRON_HOURS} hours interval") scheduler.start() print("Scheduler started") print(f"Next run: {scheduler.get_job('harvest').next_run_time}") # Pydantic models for API requests and responses class CronSettings(BaseModel): """Settings for the cron job that harvests news.""" hours: float class FeedData(BaseModel): """Data for a news feed.""" country: str url: str class ModelStatus(BaseModel): """Status of the LLM model.""" name: str status: str available_models: List[str] class ErrorResponse(BaseModel): """Standard error response.""" status: str message: str class SuccessResponse(BaseModel): """Standard success response.""" status: str class TimestampResponse(BaseModel): """Response containing a timestamp.""" ts: int class HoursResponse(BaseModel): """Response containing hours setting.""" hours: float # Dependency for getting a database cursor async def get_db(): """ Dependency that provides a database cursor. Yields: A database cursor for executing SQL statements """ with db_manager.get_cursor() as cursor: yield cursor # API endpoints @app.get("/news", response_model=List[Dict[str, Any]]) async def get_news( country: str = "DE", from_: str = "2025-07-01", to: str = datetime.now(timezone.utc).strftime("%Y-%m-%d"), db: sqlite3.Cursor = Depends(get_db) ): """ Get news articles filtered by country and date range. Args: country: Country code to filter by (default: "DE") from_: Start date in ISO format (default: "2025-07-01") to: End date in ISO format (default: current date) db: Database cursor dependency Returns: List of news articles matching the criteria """ try: datetime.fromisoformat(from_) datetime.fromisoformat(to) except ValueError: raise HTTPException(400, "Invalid date format") finally: db.execute( """ SELECT id, title, description, url, published, country, created_at FROM news WHERE country=? AND published BETWEEN ? AND ? ORDER BY published DESC """, (country, from_, to) ) return [dict(row) for row in db.fetchall()] @app.get("/meta/last-sync", response_model=TimestampResponse) async def get_last_sync(db: sqlite3.Cursor = Depends(get_db)): """ Get the timestamp of the last successful feed synchronization. Args: db: Database cursor dependency Returns: Object containing the timestamp as a Unix epoch """ db.execute("SELECT val FROM meta WHERE key='last_sync'") row = db.fetchone() return {"ts": int(row["val"])} @app.put("/settings/cron", response_model=HoursResponse) async def set_cron_schedule( data: CronSettings, db: sqlite3.Cursor = Depends(get_db) ): """ Update the cron schedule for harvesting news. Args: data: New cron settings with hours interval db: Database cursor dependency Returns: Object containing the updated hours setting """ # Ensure minimum interval hours = max(MIN_CRON_HOURS, data.hours) # Update scheduler scheduler.get_job("harvest").modify(trigger="interval", hours=hours) # Update database db.execute( "UPDATE settings SET val=? WHERE key='cron_hours'", (str(hours),) ) return {"hours": hours} @app.get("/feeds", response_model=List[Dict[str, Any]]) async def list_feeds(db: sqlite3.Cursor = Depends(get_db)): """ List all registered news feeds. Args: db: Database cursor dependency Returns: List of feed objects with id, country, and url """ db.execute("SELECT * FROM feeds ORDER BY country") return [dict(row) for row in db.fetchall()] @app.post("/feeds", response_model=SuccessResponse) async def add_feed( feed: FeedData, db: sqlite3.Cursor = Depends(get_db) ): """ Add a new news feed. Args: feed: Feed data with country and URL db: Database cursor dependency Returns: Success status """ db.execute( "INSERT INTO feeds (country, url) VALUES (?, ?) " "ON CONFLICT (url) DO NOTHING", (feed.country, feed.url) ) return {"status": "added"} @app.delete("/feeds", response_model=SuccessResponse) async def delete_feed( url: str, db: sqlite3.Cursor = Depends(get_db) ): """ Delete a news feed by URL. Args: url: URL of the feed to delete db: Database cursor dependency Returns: Success status """ db.execute("DELETE FROM feeds WHERE url=?", (url,)) return {"status": "deleted"} @app.get("/model/status", response_model=Union[ModelStatus, ErrorResponse]) async def get_model_status(): """ Check the status of the LLM model. Returns: Object containing model name, status, and available models, or an error response if the model service is unavailable """ try: async with httpx.AsyncClient() as client: # Get model information from Ollama response = await client.get( f"{OLLAMA_HOST}/api/tags", timeout=OLLAMA_API_TIMEOUT_SECONDS ) response.raise_for_status() models_data = response.json() models = models_data.get("models", []) # Check if the current model is available model_available = any( model.get("name") == LLM_MODEL for model in models ) return { "name": LLM_MODEL, "status": "ready" if model_available else "not available", "available_models": [model.get("name") for model in models] } except Exception as e: return {"status": "error", "message": str(e)} @app.post("/sync", response_model=None) async def manual_sync(db: sqlite3.Cursor = Depends(get_db)): """ Manually trigger a feed synchronization. Args: db: Database cursor dependency Returns: Success status or error response if sync was triggered too recently """ # Check when the last sync was performed db.execute("SELECT val FROM meta WHERE key='last_sync'") row = db.fetchone() last_sync_ts = int(row["val"]) # Enforce cooldown period now = datetime.now(timezone.utc) last_sync_time = datetime.fromtimestamp(last_sync_ts, timezone.utc) if now - last_sync_time < timedelta(minutes=SYNC_COOLDOWN_MINUTES): return Response( status_code=status.HTTP_429_TOO_MANY_REQUESTS, content=f"Sync too soon – wait {SYNC_COOLDOWN_MINUTES} min." ) # Trigger sync in background try: task = asyncio.create_task(NewsFetcher.harvest_feeds()) return {"status": "triggered", "task_id": id(task)} except Exception as e: raise HTTPException(500, f"Failed to trigger sync: {str(e)}") # Mount static frontend frontend_path = os.path.join( os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "frontend", "dist" ) app.mount("/", StaticFiles(directory=frontend_path, html=True), name="static")